summaryrefslogtreecommitdiffstats
path: root/modules/http2/h2_mplx.c
diff options
context:
space:
mode:
Diffstat (limited to 'modules/http2/h2_mplx.c')
-rw-r--r--modules/http2/h2_mplx.c1834
1 files changed, 908 insertions, 926 deletions
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c
index 0fae117..2aeea42 100644
--- a/modules/http2/h2_mplx.c
+++ b/modules/http2/h2_mplx.c
@@ -26,7 +26,9 @@
#include <httpd.h>
#include <http_core.h>
+#include <http_connection.h>
#include <http_log.h>
+#include <http_protocol.h>
#include <mpm_common.h>
@@ -36,15 +38,14 @@
#include "h2_private.h"
#include "h2_bucket_beam.h"
#include "h2_config.h"
-#include "h2_conn.h"
-#include "h2_ctx.h"
-#include "h2_h2.h"
+#include "h2_c1.h"
+#include "h2_conn_ctx.h"
+#include "h2_protocol.h"
#include "h2_mplx.h"
-#include "h2_ngn_shed.h"
#include "h2_request.h"
#include "h2_stream.h"
#include "h2_session.h"
-#include "h2_task.h"
+#include "h2_c2.h"
#include "h2_workers.h"
#include "h2_util.h"
@@ -54,16 +55,40 @@ typedef struct {
h2_mplx *m;
h2_stream *stream;
apr_time_t now;
+ apr_size_t count;
} stream_iter_ctx;
-apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
+static conn_rec *c2_prod_next(void *baton, int *phas_more);
+static void c2_prod_done(void *baton, conn_rec *c2);
+static void workers_shutdown(void *baton, int graceful);
+
+static void s_mplx_be_happy(h2_mplx *m, conn_rec *c, h2_conn_ctx_t *conn_ctx);
+static void m_be_annoyed(h2_mplx *m);
+
+static apr_status_t mplx_pollset_create(h2_mplx *m);
+static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
+ stream_ev_callback *on_stream_input,
+ stream_ev_callback *on_stream_output,
+ void *on_ctx);
+
+static apr_pool_t *pchild;
+
+/* APR callback invoked if allocation fails. */
+static int abort_on_oom(int retcode)
{
+ ap_abort_on_oom();
+ return retcode; /* unreachable, hopefully. */
+}
+
+apr_status_t h2_mplx_c1_child_init(apr_pool_t *pool, server_rec *s)
+{
+ pchild = pool;
return APR_SUCCESS;
}
#define H2_MPLX_ENTER(m) \
- do { apr_status_t rv; if ((rv = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\
- return rv;\
+ do { apr_status_t rv_lock; if ((rv_lock = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\
+ return rv_lock;\
} } while(0)
#define H2_MPLX_LEAVE(m) \
@@ -72,71 +97,150 @@ apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
#define H2_MPLX_ENTER_ALWAYS(m) \
apr_thread_mutex_lock(m->lock)
-#define H2_MPLX_ENTER_MAYBE(m, lock) \
- if (lock) apr_thread_mutex_lock(m->lock)
-
-#define H2_MPLX_LEAVE_MAYBE(m, lock) \
- if (lock) apr_thread_mutex_unlock(m->lock)
+#define H2_MPLX_ENTER_MAYBE(m, dolock) \
+ if (dolock) apr_thread_mutex_lock(m->lock)
-static void check_data_for(h2_mplx *m, h2_stream *stream, int lock);
+#define H2_MPLX_LEAVE_MAYBE(m, dolock) \
+ if (dolock) apr_thread_mutex_unlock(m->lock)
-static void stream_output_consumed(void *ctx,
- h2_bucket_beam *beam, apr_off_t length)
+static void c1_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length)
{
- h2_stream *stream = ctx;
- h2_task *task = stream->task;
-
- if (length > 0 && task && task->assigned) {
- h2_req_engine_out_consumed(task->assigned, task->c, length);
- }
+ h2_stream_in_consumed(ctx, length);
}
-static void stream_input_ev(void *ctx, h2_bucket_beam *beam)
+static int stream_is_running(h2_stream *stream)
{
- h2_stream *stream = ctx;
- h2_mplx *m = stream->session->mplx;
- apr_atomic_set32(&m->event_pending, 1);
+ h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(stream->c2);
+ return conn_ctx && apr_atomic_read32(&conn_ctx->started) != 0
+ && apr_atomic_read32(&conn_ctx->done) == 0;
}
-static void stream_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length)
+int h2_mplx_c1_stream_is_running(h2_mplx *m, h2_stream *stream)
{
- h2_stream_in_consumed(ctx, length);
+ int rv;
+
+ H2_MPLX_ENTER(m);
+ rv = stream_is_running(stream);
+ H2_MPLX_LEAVE(m);
+ return rv;
}
-static void stream_joined(h2_mplx *m, h2_stream *stream)
+static void c1c2_stream_joined(h2_mplx *m, h2_stream *stream)
{
- ap_assert(!stream->task || stream->task->worker_done);
+ ap_assert(!stream_is_running(stream));
h2_ihash_remove(m->shold, stream->id);
- h2_ihash_add(m->spurge, stream);
+ APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;
}
-static void stream_cleanup(h2_mplx *m, h2_stream *stream)
+static void m_stream_cleanup(h2_mplx *m, h2_stream *stream)
{
- ap_assert(stream->state == H2_SS_CLEANUP);
+ h2_conn_ctx_t *c2_ctx = h2_conn_ctx_get(stream->c2);
- if (stream->input) {
- h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
- h2_beam_abort(stream->input);
- }
- if (stream->output) {
- h2_beam_on_produced(stream->output, NULL, NULL);
- h2_beam_leave(stream->output);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
+ H2_STRM_MSG(stream, "cleanup, unsubscribing from beam events"));
+ if (c2_ctx) {
+ if (c2_ctx->beam_out) {
+ h2_beam_on_was_empty(c2_ctx->beam_out, NULL, NULL);
+ }
+ if (c2_ctx->beam_in) {
+ h2_beam_on_send(c2_ctx->beam_in, NULL, NULL);
+ h2_beam_on_received(c2_ctx->beam_in, NULL, NULL);
+ h2_beam_on_eagain(c2_ctx->beam_in, NULL, NULL);
+ h2_beam_on_consumed(c2_ctx->beam_in, NULL, NULL);
+ }
}
-
- h2_stream_cleanup(stream);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
+ H2_STRM_MSG(stream, "cleanup, removing from registries"));
+ ap_assert(stream->state == H2_SS_CLEANUP);
+ h2_stream_cleanup(stream);
h2_ihash_remove(m->streams, stream->id);
h2_iq_remove(m->q, stream->id);
- h2_ififo_remove(m->readyq, stream->id);
- h2_ihash_add(m->shold, stream);
-
- if (!stream->task || stream->task->worker_done) {
- stream_joined(m, stream);
+
+ if (c2_ctx) {
+ if (!stream_is_running(stream)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
+ H2_STRM_MSG(stream, "cleanup, c2 is done, move to spurge"));
+ /* processing has finished */
+ APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;
+ }
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
+ H2_STRM_MSG(stream, "cleanup, c2 is running, abort"));
+ /* c2 is still running */
+ h2_c2_abort(stream->c2, m->c1);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
+ H2_STRM_MSG(stream, "cleanup, c2 is done, move to shold"));
+ h2_ihash_add(m->shold, stream);
+ }
}
- else if (stream->task) {
- stream->task->c->aborted = 1;
- apr_thread_cond_broadcast(m->task_thawed);
+ else {
+ /* never started */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
+ H2_STRM_MSG(stream, "cleanup, never started, move to spurge"));
+ APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;
+ }
+}
+
+static h2_c2_transit *c2_transit_create(h2_mplx *m)
+{
+ apr_allocator_t *allocator;
+ apr_pool_t *ptrans;
+ h2_c2_transit *transit;
+ apr_status_t rv;
+
+ /* We create a pool with its own allocator to be used for
+ * processing a request. This is the only way to have the processing
+ * independent of its parent pool in the sense that it can work in
+ * another thread.
+ */
+
+ rv = apr_allocator_create(&allocator);
+ if (rv == APR_SUCCESS) {
+ apr_allocator_max_free_set(allocator, ap_max_mem_free);
+ rv = apr_pool_create_ex(&ptrans, m->pool, NULL, allocator);
+ }
+ if (rv != APR_SUCCESS) {
+ /* maybe the log goes through, maybe not. */
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1,
+ APLOGNO(10004) "h2_mplx: create transit pool");
+ ap_abort_on_oom();
+ return NULL; /* should never be reached. */
+ }
+
+ apr_allocator_owner_set(allocator, ptrans);
+ apr_pool_abort_set(abort_on_oom, ptrans);
+ apr_pool_tag(ptrans, "h2_c2_transit");
+
+ transit = apr_pcalloc(ptrans, sizeof(*transit));
+ transit->pool = ptrans;
+ transit->bucket_alloc = apr_bucket_alloc_create(ptrans);
+ return transit;
+}
+
+static void c2_transit_destroy(h2_c2_transit *transit)
+{
+ apr_pool_destroy(transit->pool);
+}
+
+static h2_c2_transit *c2_transit_get(h2_mplx *m)
+{
+ h2_c2_transit **ptransit = apr_array_pop(m->c2_transits);
+ if (ptransit) {
+ return *ptransit;
+ }
+ return c2_transit_create(m);
+}
+
+static void c2_transit_recycle(h2_mplx *m, h2_c2_transit *transit)
+{
+ if (m->c2_transits->nelts >= APR_INT32_MAX ||
+ (apr_uint32_t)m->c2_transits->nelts >= m->max_spare_transits) {
+ c2_transit_destroy(transit);
+ }
+ else {
+ APR_ARRAY_PUSH(m->c2_transits, h2_c2_transit*) = transit;
}
}
@@ -151,208 +255,118 @@ static void stream_cleanup(h2_mplx *m, h2_stream *stream)
* their HTTP/1 cousins, the separate allocator seems to work better
* than protecting a shared h2_session one with an own lock.
*/
-h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
- const h2_config *conf,
- h2_workers *workers)
+h2_mplx *h2_mplx_c1_create(int child_num, apr_uint32_t id, h2_stream *stream0,
+ server_rec *s, apr_pool_t *parent,
+ h2_workers *workers)
{
+ h2_conn_ctx_t *conn_ctx;
apr_status_t status = APR_SUCCESS;
apr_allocator_t *allocator;
- apr_thread_mutex_t *mutex;
- h2_mplx *m;
- h2_ctx *ctx = h2_ctx_get(c, 0);
- ap_assert(conf);
+ apr_thread_mutex_t *mutex = NULL;
+ h2_mplx *m = NULL;
m = apr_pcalloc(parent, sizeof(h2_mplx));
- if (m) {
- m->id = c->id;
- m->c = c;
- m->s = (ctx? h2_ctx_server_get(ctx) : NULL);
- if (!m->s) {
- m->s = c->base_server;
- }
-
- /* We create a pool with its own allocator to be used for
- * processing slave connections. This is the only way to have the
- * processing independant of its parent pool in the sense that it
- * can work in another thread. Also, the new allocator needs its own
- * mutex to synchronize sub-pools.
- */
- status = apr_allocator_create(&allocator);
- if (status != APR_SUCCESS) {
- return NULL;
- }
- apr_allocator_max_free_set(allocator, ap_max_mem_free);
- apr_pool_create_ex(&m->pool, parent, NULL, allocator);
- if (!m->pool) {
- apr_allocator_destroy(allocator);
- return NULL;
- }
- apr_pool_tag(m->pool, "h2_mplx");
- apr_allocator_owner_set(allocator, m->pool);
- status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT,
- m->pool);
- if (status != APR_SUCCESS) {
- apr_pool_destroy(m->pool);
- return NULL;
- }
- apr_allocator_mutex_set(allocator, mutex);
-
- status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
- m->pool);
- if (status != APR_SUCCESS) {
- apr_pool_destroy(m->pool);
- return NULL;
- }
-
- status = apr_thread_cond_create(&m->task_thawed, m->pool);
- if (status != APR_SUCCESS) {
- apr_pool_destroy(m->pool);
- return NULL;
- }
-
- m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS);
- m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
-
- m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
- m->sredo = h2_ihash_create(m->pool, offsetof(h2_stream,id));
- m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
- m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
- m->q = h2_iq_create(m->pool, m->max_streams);
-
- status = h2_ififo_set_create(&m->readyq, m->pool, m->max_streams);
- if (status != APR_SUCCESS) {
- apr_pool_destroy(m->pool);
- return NULL;
- }
+ m->stream0 = stream0;
+ m->c1 = stream0->c2;
+ m->s = s;
+ m->child_num = child_num;
+ m->id = id;
+
+ /* We create a pool with its own allocator to be used for
+ * processing secondary connections. This is the only way to have the
+ * processing independent of its parent pool in the sense that it
+ * can work in another thread. Also, the new allocator needs its own
+ * mutex to synchronize sub-pools.
+ */
+ status = apr_allocator_create(&allocator);
+ if (status != APR_SUCCESS) {
+ allocator = NULL;
+ goto failure;
+ }
+
+ apr_allocator_max_free_set(allocator, ap_max_mem_free);
+ apr_pool_create_ex(&m->pool, parent, NULL, allocator);
+ if (!m->pool) goto failure;
+
+ apr_pool_tag(m->pool, "h2_mplx");
+ apr_allocator_owner_set(allocator, m->pool);
+
+ status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT,
+ m->pool);
+ if (APR_SUCCESS != status) goto failure;
+ apr_allocator_mutex_set(allocator, mutex);
+
+ status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
+ m->pool);
+ if (APR_SUCCESS != status) goto failure;
+
+ m->max_streams = h2_config_sgeti(s, H2_CONF_MAX_STREAMS);
+ m->stream_max_mem = h2_config_sgeti(s, H2_CONF_STREAM_MAX_MEM);
+
+ m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+ m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+ m->spurge = apr_array_make(m->pool, 10, sizeof(h2_stream*));
+ m->q = h2_iq_create(m->pool, m->max_streams);
+
+ m->workers = workers;
+ m->processing_max = H2MIN(h2_workers_get_max_workers(workers), m->max_streams);
+ m->processing_limit = 6; /* the original h1 max parallel connections */
+ m->last_mood_change = apr_time_now();
+ m->mood_update_interval = apr_time_from_msec(100);
+
+ status = mplx_pollset_create(m);
+ if (APR_SUCCESS != status) {
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, status, m->c1, APLOGNO(10308)
+ "nghttp2: could not create pollset");
+ goto failure;
+ }
+ m->streams_ev_in = apr_array_make(m->pool, 10, sizeof(h2_stream*));
+ m->streams_ev_out = apr_array_make(m->pool, 10, sizeof(h2_stream*));
+
+ m->streams_input_read = h2_iq_create(m->pool, 10);
+ m->streams_output_written = h2_iq_create(m->pool, 10);
+ status = apr_thread_mutex_create(&m->poll_lock, APR_THREAD_MUTEX_DEFAULT,
+ m->pool);
+ if (APR_SUCCESS != status) goto failure;
+
+ conn_ctx = h2_conn_ctx_get(m->c1);
+ if (conn_ctx->pfd.reqevents) {
+ apr_pollset_add(m->pollset, &conn_ctx->pfd);
+ }
+
+ m->max_spare_transits = 3;
+ m->c2_transits = apr_array_make(m->pool, (int)m->max_spare_transits,
+ sizeof(h2_c2_transit*));
+
+ m->producer = h2_workers_register(workers, m->pool,
+ apr_psprintf(m->pool, "h2-%u",
+ (unsigned int)m->id),
+ c2_prod_next, c2_prod_done,
+ workers_shutdown, m);
+ return m;
- m->workers = workers;
- m->max_active = workers->max_workers;
- m->limit_active = 6; /* the original h1 max parallel connections */
- m->last_limit_change = m->last_idle_block = apr_time_now();
- m->limit_change_interval = apr_time_from_msec(100);
-
- m->spare_slaves = apr_array_make(m->pool, 10, sizeof(conn_rec*));
-
- m->ngn_shed = h2_ngn_shed_create(m->pool, m->c, m->max_streams,
- m->stream_max_mem);
- h2_ngn_shed_set_ctx(m->ngn_shed , m);
+failure:
+ if (m->pool) {
+ apr_pool_destroy(m->pool);
}
- return m;
+ else if (allocator) {
+ apr_allocator_destroy(allocator);
+ }
+ return NULL;
}
-int h2_mplx_shutdown(h2_mplx *m)
+int h2_mplx_c1_shutdown(h2_mplx *m)
{
- int max_stream_started = 0;
+ int max_stream_id_started = 0;
H2_MPLX_ENTER(m);
- max_stream_started = m->max_stream_started;
+ max_stream_id_started = m->max_stream_id_started;
/* Clear schedule queue, disabling existing streams from starting */
h2_iq_clear(m->q);
H2_MPLX_LEAVE(m);
- return max_stream_started;
-}
-
-static int input_consumed_signal(h2_mplx *m, h2_stream *stream)
-{
- if (stream->input) {
- return h2_beam_report_consumption(stream->input);
- }
- return 0;
-}
-
-static int report_consumption_iter(void *ctx, void *val)
-{
- h2_stream *stream = val;
- h2_mplx *m = ctx;
-
- input_consumed_signal(m, stream);
- if (stream->state == H2_SS_CLOSED_L
- && (!stream->task || stream->task->worker_done)) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
- H2_STRM_LOG(APLOGNO(10026), stream, "remote close missing"));
- nghttp2_submit_rst_stream(stream->session->ngh2, NGHTTP2_FLAG_NONE,
- stream->id, NGHTTP2_NO_ERROR);
- }
- return 1;
-}
-
-static int output_consumed_signal(h2_mplx *m, h2_task *task)
-{
- if (task->output.beam) {
- return h2_beam_report_consumption(task->output.beam);
- }
- return 0;
-}
-
-static int stream_destroy_iter(void *ctx, void *val)
-{
- h2_mplx *m = ctx;
- h2_stream *stream = val;
-
- h2_ihash_remove(m->spurge, stream->id);
- ap_assert(stream->state == H2_SS_CLEANUP);
-
- if (stream->input) {
- /* Process outstanding events before destruction */
- input_consumed_signal(m, stream);
- h2_beam_log(stream->input, m->c, APLOG_TRACE2, "stream_destroy");
- h2_beam_destroy(stream->input);
- stream->input = NULL;
- }
-
- if (stream->task) {
- h2_task *task = stream->task;
- conn_rec *slave;
- int reuse_slave = 0;
-
- stream->task = NULL;
- slave = task->c;
- if (slave) {
- /* On non-serialized requests, the IO logging has not accounted for any
- * meta data send over the network: response headers and h2 frame headers. we
- * counted this on the stream and need to add this now.
- * This is supposed to happen before the EOR bucket triggers the
- * logging of the transaction. *fingers crossed* */
- if (task->request && !task->request->serialize && h2_task_logio_add_bytes_out) {
- apr_off_t unaccounted = stream->out_frame_octets - stream->out_data_octets;
- if (unaccounted > 0) {
- h2_task_logio_add_bytes_out(slave, unaccounted);
- }
- }
-
- if (m->s->keep_alive_max == 0 || slave->keepalives < m->s->keep_alive_max) {
- reuse_slave = ((m->spare_slaves->nelts < (m->limit_active * 3 / 2))
- && !task->rst_error);
- }
-
- if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) {
- h2_beam_log(task->output.beam, m->c, APLOG_DEBUG,
- APLOGNO(03385) "h2_task_destroy, reuse slave");
- h2_task_destroy(task);
- APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave;
- }
- else {
- h2_beam_log(task->output.beam, m->c, APLOG_TRACE1,
- "h2_task_destroy, destroy slave");
- h2_slave_destroy(slave);
- }
- }
- }
- h2_stream_destroy(stream);
- return 0;
-}
-
-static void purge_streams(h2_mplx *m, int lock)
-{
- if (!h2_ihash_empty(m->spurge)) {
- H2_MPLX_ENTER_MAYBE(m, lock);
- while (!h2_ihash_iter(m->spurge, stream_destroy_iter, m)) {
- /* repeat until empty */
- }
- H2_MPLX_LEAVE_MAYBE(m, lock);
- }
+ return max_stream_id_started;
}
typedef struct {
@@ -360,13 +374,13 @@ typedef struct {
void *ctx;
} stream_iter_ctx_t;
-static int stream_iter_wrap(void *ctx, void *stream)
+static int m_stream_iter_wrap(void *ctx, void *stream)
{
stream_iter_ctx_t *x = ctx;
return x->cb(stream, x->ctx);
}
-apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
+apr_status_t h2_mplx_c1_streams_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
{
stream_iter_ctx_t x;
@@ -374,276 +388,260 @@ apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
x.cb = cb;
x.ctx = ctx;
- h2_ihash_iter(m->streams, stream_iter_wrap, &x);
+ h2_ihash_iter(m->streams, m_stream_iter_wrap, &x);
H2_MPLX_LEAVE(m);
return APR_SUCCESS;
}
-static int report_stream_iter(void *ctx, void *val) {
+typedef struct {
+ int stream_count;
+ int stream_want_send;
+} stream_iter_aws_t;
+
+static int m_stream_want_send_data(void *ctx, void *stream)
+{
+ stream_iter_aws_t *x = ctx;
+ ++x->stream_count;
+ if (h2_stream_wants_send_data(stream))
+ ++x->stream_want_send;
+ return 1;
+}
+
+int h2_mplx_c1_all_streams_want_send_data(h2_mplx *m)
+{
+ stream_iter_aws_t x;
+ x.stream_count = 0;
+ x.stream_want_send = 0;
+ H2_MPLX_ENTER(m);
+ h2_ihash_iter(m->streams, m_stream_want_send_data, &x);
+ H2_MPLX_LEAVE(m);
+ return x.stream_count && (x.stream_count == x.stream_want_send);
+}
+
+static int m_report_stream_iter(void *ctx, void *val) {
h2_mplx *m = ctx;
h2_stream *stream = val;
- h2_task *task = stream->task;
- if (APLOGctrace1(m->c)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- H2_STRM_MSG(stream, "started=%d, scheduled=%d, ready=%d, out_buffer=%ld"),
- !!stream->task, stream->scheduled, h2_stream_is_ready(stream),
- (long)h2_beam_get_buffered(stream->output));
- }
- if (task) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
+ h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(stream->c2);
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c1,
+ H2_STRM_MSG(stream, "started=%d, scheduled=%d, ready=%d, out_buffer=%ld"),
+ !!stream->c2, stream->scheduled, h2_stream_is_ready(stream),
+ (long)(stream->output? h2_beam_get_buffered(stream->output) : -1));
+ if (conn_ctx) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, /* NO APLOGNO */
H2_STRM_MSG(stream, "->03198: %s %s %s"
- "[started=%d/done=%d/frozen=%d]"),
- task->request->method, task->request->authority,
- task->request->path, task->worker_started,
- task->worker_done, task->frozen);
+ "[started=%u/done=%u]"),
+ conn_ctx->request->method, conn_ctx->request->authority,
+ conn_ctx->request->path,
+ apr_atomic_read32(&conn_ctx->started),
+ apr_atomic_read32(&conn_ctx->done));
}
else {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
- H2_STRM_MSG(stream, "->03198: no task"));
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, /* NO APLOGNO */
+ H2_STRM_MSG(stream, "->03198: not started"));
}
return 1;
}
-static int unexpected_stream_iter(void *ctx, void *val) {
+static int m_unexpected_stream_iter(void *ctx, void *val) {
h2_mplx *m = ctx;
h2_stream *stream = val;
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c1, /* NO APLOGNO */
H2_STRM_MSG(stream, "unexpected, started=%d, scheduled=%d, ready=%d"),
- !!stream->task, stream->scheduled, h2_stream_is_ready(stream));
+ !!stream->c2, stream->scheduled, h2_stream_is_ready(stream));
return 1;
}
-static int stream_cancel_iter(void *ctx, void *val) {
+static int m_stream_cancel_iter(void *ctx, void *val) {
h2_mplx *m = ctx;
h2_stream *stream = val;
- /* disabled input consumed reporting */
- if (stream->input) {
- h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
- }
/* take over event monitoring */
h2_stream_set_monitor(stream, NULL);
/* Reset, should transit to CLOSED state */
h2_stream_rst(stream, H2_ERR_NO_ERROR);
/* All connection data has been sent, simulate cleanup */
h2_stream_dispatch(stream, H2_SEV_EOS_SENT);
- stream_cleanup(m, stream);
+ m_stream_cleanup(m, stream);
return 0;
}
-void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
+static void c1_purge_streams(h2_mplx *m);
+
+void h2_mplx_c1_destroy(h2_mplx *m)
{
apr_status_t status;
- int i, wait_secs = 60;
+ unsigned int i, wait_secs = 60;
+ int old_aborted;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
+ H2_MPLX_MSG(m, "start release"));
/* How to shut down a h2 connection:
- * 0. abort and tell the workers that no more tasks will come from us */
- m->aborted = 1;
- h2_workers_unregister(m->workers, m);
-
+ * 0. abort and tell the workers that no more work will come from us */
+ m->shutdown = m->aborted = 1;
+
H2_MPLX_ENTER_ALWAYS(m);
+ /* While really terminating any c2 connections, treat the master
+ * connection as aborted. It's not as if we could send any more data
+ * at this point. */
+ old_aborted = m->c1->aborted;
+ m->c1->aborted = 1;
+
/* How to shut down a h2 connection:
* 1. cancel all streams still active */
- while (!h2_ihash_iter(m->streams, stream_cancel_iter, m)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
+ H2_MPLX_MSG(m, "release, %u/%u/%d streams (total/hold/purge), %d streams"),
+ h2_ihash_count(m->streams),
+ h2_ihash_count(m->shold),
+ m->spurge->nelts, m->processing_count);
+ while (!h2_ihash_iter(m->streams, m_stream_cancel_iter, m)) {
/* until empty */
}
- /* 2. terminate ngn_shed, no more streams
- * should be scheduled or in the active set */
- h2_ngn_shed_abort(m->ngn_shed);
+ /* 2. no more streams should be scheduled or in the active set */
ap_assert(h2_ihash_empty(m->streams));
ap_assert(h2_iq_empty(m->q));
/* 3. while workers are busy on this connection, meaning they
- * are processing tasks from this connection, wait on them finishing
+ * are processing streams from this connection, wait on them finishing
* in order to wake us and let us check again.
- * Eventually, this has to succeed. */
- m->join_wait = wait;
- for (i = 0; h2_ihash_count(m->shold) > 0; ++i) {
- status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
+ * Eventually, this has to succeed. */
+ if (!m->join_wait) {
+ apr_thread_cond_create(&m->join_wait, m->pool);
+ }
+
+ for (i = 0; h2_ihash_count(m->shold) > 0; ++i) {
+ status = apr_thread_cond_timedwait(m->join_wait, m->lock, apr_time_from_sec(wait_secs));
if (APR_STATUS_IS_TIMEUP(status)) {
/* This can happen if we have very long running requests
* that do not time out on IO. */
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03198)
- "h2_mplx(%ld): waited %d sec for %d tasks",
- m->id, i*wait_secs, (int)h2_ihash_count(m->shold));
- h2_ihash_iter(m->shold, report_stream_iter, m);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, APLOGNO(03198)
+ H2_MPLX_MSG(m, "waited %u sec for %u streams"),
+ i*wait_secs, h2_ihash_count(m->shold));
+ h2_ihash_iter(m->shold, m_report_stream_iter, m);
}
}
- ap_assert(m->tasks_active == 0);
- m->join_wait = NULL;
-
- /* 4. close the h2_req_enginge shed */
- h2_ngn_shed_destroy(m->ngn_shed);
- m->ngn_shed = NULL;
-
+
+ H2_MPLX_LEAVE(m);
+ h2_workers_join(m->workers, m->producer);
+ H2_MPLX_ENTER_ALWAYS(m);
+
/* 4. With all workers done, all streams should be in spurge */
+ ap_assert(m->processing_count == 0);
if (!h2_ihash_empty(m->shold)) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03516)
- "h2_mplx(%ld): unexpected %d streams in hold",
- m->id, (int)h2_ihash_count(m->shold));
- h2_ihash_iter(m->shold, unexpected_stream_iter, m);
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c1, APLOGNO(03516)
+ H2_MPLX_MSG(m, "unexpected %u streams in hold"),
+ h2_ihash_count(m->shold));
+ h2_ihash_iter(m->shold, m_unexpected_stream_iter, m);
}
-
+
+ c1_purge_streams(m);
+
+ m->c1->aborted = old_aborted;
H2_MPLX_LEAVE(m);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): released", m->id);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
+ H2_MPLX_MSG(m, "released"));
}
-apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, h2_stream *stream)
+apr_status_t h2_mplx_c1_stream_cleanup(h2_mplx *m, h2_stream *stream,
+ unsigned int *pstream_count)
{
H2_MPLX_ENTER(m);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
H2_STRM_MSG(stream, "cleanup"));
- stream_cleanup(m, stream);
-
+ m_stream_cleanup(m, stream);
+ *pstream_count = h2_ihash_count(m->streams);
H2_MPLX_LEAVE(m);
return APR_SUCCESS;
}
-h2_stream *h2_mplx_stream_get(h2_mplx *m, int id)
+const h2_stream *h2_mplx_c2_stream_get(h2_mplx *m, int stream_id)
{
h2_stream *s = NULL;
H2_MPLX_ENTER_ALWAYS(m);
-
- s = h2_ihash_get(m->streams, id);
-
+ s = h2_ihash_get(m->streams, stream_id);
H2_MPLX_LEAVE(m);
+
return s;
}
-static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
-{
- h2_stream *stream = ctx;
- h2_mplx *m = stream->session->mplx;
-
- check_data_for(m, stream, 1);
-}
-static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
+static void c1_purge_streams(h2_mplx *m)
{
- apr_status_t status = APR_SUCCESS;
- h2_stream *stream = h2_ihash_get(m->streams, stream_id);
-
- if (!stream || !stream->task || m->aborted) {
- return APR_ECONNABORTED;
- }
-
- ap_assert(stream->output == NULL);
- stream->output = beam;
-
- if (APLOGctrace2(m->c)) {
- h2_beam_log(beam, m->c, APLOG_TRACE2, "out_open");
- }
- else {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
- "h2_mplx(%s): out open", stream->task->id);
- }
-
- h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream);
- h2_beam_on_produced(stream->output, output_produced, stream);
- if (stream->task->output.copy_files) {
- h2_beam_on_file_beam(stream->output, h2_beam_no_files, NULL);
- }
-
- /* we might see some file buckets in the output, see
- * if we have enough handles reserved. */
- check_data_for(m, stream, 0);
- return status;
-}
+ h2_stream *stream;
+ int i;
-apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
-{
- apr_status_t status;
-
- H2_MPLX_ENTER(m);
+ for (i = 0; i < m->spurge->nelts; ++i) {
+ stream = APR_ARRAY_IDX(m->spurge, i, h2_stream*);
+ ap_assert(stream->state == H2_SS_CLEANUP);
- if (m->aborted) {
- status = APR_ECONNABORTED;
- }
- else {
- status = out_open(m, stream_id, beam);
+ if (stream->input) {
+ h2_beam_destroy(stream->input, m->c1);
+ stream->input = NULL;
+ }
+ if (stream->c2) {
+ conn_rec *c2 = stream->c2;
+ h2_conn_ctx_t *c2_ctx = h2_conn_ctx_get(c2);
+ h2_c2_transit *transit;
+
+ stream->c2 = NULL;
+ ap_assert(c2_ctx);
+ transit = c2_ctx->transit;
+ h2_c2_destroy(c2); /* c2_ctx is gone as well */
+ if (transit) {
+ c2_transit_recycle(m, transit);
+ }
+ }
+ h2_stream_destroy(stream);
}
-
- H2_MPLX_LEAVE(m);
- return status;
+ apr_array_clear(m->spurge);
}
-static apr_status_t out_close(h2_mplx *m, h2_task *task)
+void h2_mplx_c1_going_keepalive(h2_mplx *m)
{
- apr_status_t status = APR_SUCCESS;
- h2_stream *stream;
-
- if (!task) {
- return APR_ECONNABORTED;
- }
- if (task->c) {
- ++task->c->keepalives;
- }
-
- stream = h2_ihash_get(m->streams, task->stream_id);
- if (!stream) {
- return APR_ECONNABORTED;
+ H2_MPLX_ENTER_ALWAYS(m);
+ if (m->spurge->nelts) {
+ c1_purge_streams(m);
}
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
- "h2_mplx(%s): close", task->id);
- status = h2_beam_close(task->output.beam);
- h2_beam_log(task->output.beam, m->c, APLOG_TRACE2, "out_close");
- output_consumed_signal(m, task);
- check_data_for(m, stream, 0);
- return status;
+ H2_MPLX_LEAVE(m);
}
-apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
- apr_thread_cond_t *iowait)
+apr_status_t h2_mplx_c1_poll(h2_mplx *m, apr_interval_time_t timeout,
+ stream_ev_callback *on_stream_input,
+ stream_ev_callback *on_stream_output,
+ void *on_ctx)
{
- apr_status_t status;
-
+ apr_status_t rv;
+
H2_MPLX_ENTER(m);
if (m->aborted) {
- status = APR_ECONNABORTED;
- }
- else if (h2_mplx_has_master_events(m)) {
- status = APR_SUCCESS;
- }
- else {
- purge_streams(m, 0);
- h2_ihash_iter(m->streams, report_consumption_iter, m);
- m->added_output = iowait;
- status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
- if (APLOGctrace2(m->c)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): trywait on data for %f ms)",
- m->id, timeout/1000.0);
- }
- m->added_output = NULL;
+ rv = APR_ECONNABORTED;
+ goto cleanup;
+ }
+ /* Purge (destroy) streams outside of pollset processing.
+ * Streams that are registered in the pollset, will be removed
+ * when they are destroyed, but the pollset works on copies
+ * of these registrations. So, if we destroy streams while
+ * processing pollset events, we might access freed memory.
+ */
+ if (m->spurge->nelts) {
+ c1_purge_streams(m);
}
+ rv = mplx_pollset_poll(m, timeout, on_stream_input, on_stream_output, on_ctx);
+cleanup:
H2_MPLX_LEAVE(m);
- return status;
-}
-
-static void check_data_for(h2_mplx *m, h2_stream *stream, int lock)
-{
- if (h2_ififo_push(m->readyq, stream->id) == APR_SUCCESS) {
- apr_atomic_set32(&m->event_pending, 1);
- H2_MPLX_ENTER_MAYBE(m, lock);
- if (m->added_output) {
- apr_thread_cond_signal(m->added_output);
- }
- H2_MPLX_LEAVE_MAYBE(m, lock);
- }
+ return rv;
}
-apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
+apr_status_t h2_mplx_c1_reprioritize(h2_mplx *m, h2_stream_pri_cmp_fn *cmp,
+ h2_session *session)
{
apr_status_t status;
@@ -653,9 +651,9 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
status = APR_ECONNABORTED;
}
else {
- h2_iq_sort(m->q, cmp, ctx);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): reprioritize tasks", m->id);
+ h2_iq_sort(m->q, cmp, session);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
+ H2_MPLX_MSG(m, "reprioritize streams"));
status = APR_SUCCESS;
}
@@ -663,560 +661,435 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
return status;
}
-static void register_if_needed(h2_mplx *m)
-{
- if (!m->aborted && !m->is_registered && !h2_iq_empty(m->q)) {
- apr_status_t status = h2_workers_register(m->workers, m);
- if (status == APR_SUCCESS) {
- m->is_registered = 1;
- }
- else {
- ap_log_cerror(APLOG_MARK, APLOG_ERR, status, m->c, APLOGNO(10021)
- "h2_mplx(%ld): register at workers", m->id);
- }
- }
-}
-
-apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
- h2_stream_pri_cmp *cmp, void *ctx)
+static apr_status_t c1_process_stream(h2_mplx *m,
+ h2_stream *stream,
+ h2_stream_pri_cmp_fn *cmp,
+ h2_session *session)
{
- apr_status_t status;
-
- H2_MPLX_ENTER(m);
+ apr_status_t rv = APR_SUCCESS;
if (m->aborted) {
- status = APR_ECONNABORTED;
+ rv = APR_ECONNABORTED;
+ goto cleanup;
+ }
+ if (!stream->request) {
+ rv = APR_EINVAL;
+ goto cleanup;
+ }
+ if (APLOGctrace1(m->c1)) {
+ const h2_request *r = stream->request;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
+ H2_STRM_MSG(stream, "process %s%s%s %s%s%s%s"),
+ r->protocol? r->protocol : "",
+ r->protocol? " " : "",
+ r->method, r->scheme? r->scheme : "",
+ r->scheme? "://" : "",
+ r->authority, r->path? r->path: "");
+ }
+
+ stream->scheduled = 1;
+ h2_ihash_add(m->streams, stream);
+ if (h2_stream_is_ready(stream)) {
+ /* already have a response */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
+ H2_STRM_MSG(stream, "process, ready already"));
}
else {
- status = APR_SUCCESS;
- h2_ihash_add(m->streams, stream);
- if (h2_stream_is_ready(stream)) {
- /* already have a response */
- check_data_for(m, stream, 0);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- H2_STRM_MSG(stream, "process, add to readyq"));
- }
- else {
- h2_iq_add(m->q, stream->id, cmp, ctx);
- register_if_needed(m);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- H2_STRM_MSG(stream, "process, added to q"));
- }
+ /* last chance to set anything up before stream is processed
+ * by worker threads. */
+ rv = h2_stream_prepare_processing(stream);
+ if (APR_SUCCESS != rv) goto cleanup;
+ h2_iq_add(m->q, stream->id, cmp, session);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
+ H2_STRM_MSG(stream, "process, added to q"));
}
- H2_MPLX_LEAVE(m);
- return status;
+cleanup:
+ return rv;
}
-static h2_task *next_stream_task(h2_mplx *m)
+void h2_mplx_c1_process(h2_mplx *m,
+ h2_iqueue *ready_to_process,
+ h2_stream_get_fn *get_stream,
+ h2_stream_pri_cmp_fn *stream_pri_cmp,
+ h2_session *session,
+ unsigned int *pstream_count)
{
- h2_stream *stream;
+ apr_status_t rv;
int sid;
- while (!m->aborted && (m->tasks_active < m->limit_active)
- && (sid = h2_iq_shift(m->q)) > 0) {
-
- stream = h2_ihash_get(m->streams, sid);
- if (stream) {
- conn_rec *slave, **pslave;
- pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
- if (pslave) {
- slave = *pslave;
- slave->aborted = 0;
- }
- else {
- slave = h2_slave_create(m->c, stream->id, m->pool);
- }
-
- if (!stream->task) {
+ H2_MPLX_ENTER_ALWAYS(m);
- if (sid > m->max_stream_started) {
- m->max_stream_started = sid;
- }
- if (stream->input) {
- h2_beam_on_consumed(stream->input, stream_input_ev,
- stream_input_consumed, stream);
- }
-
- stream->task = h2_task_create(slave, stream->id,
- stream->request, m, stream->input,
- stream->session->s->timeout,
- m->stream_max_mem);
- if (!stream->task) {
- ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, slave,
- H2_STRM_LOG(APLOGNO(02941), stream,
- "create task"));
- return NULL;
- }
-
+ while ((sid = h2_iq_shift(ready_to_process)) > 0) {
+ h2_stream *stream = get_stream(session, sid);
+ if (stream) {
+ ap_assert(!stream->scheduled);
+ rv = c1_process_stream(session->mplx, stream, stream_pri_cmp, session);
+ if (APR_SUCCESS != rv) {
+ h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
}
-
- ++m->tasks_active;
- return stream->task;
- }
- }
- return NULL;
-}
-
-apr_status_t h2_mplx_pop_task(h2_mplx *m, h2_task **ptask)
-{
- apr_status_t rv = APR_EOF;
-
- *ptask = NULL;
- ap_assert(m);
- ap_assert(m->lock);
-
- if (APR_SUCCESS != (rv = apr_thread_mutex_lock(m->lock))) {
- return rv;
- }
-
- if (m->aborted) {
- rv = APR_EOF;
- }
- else {
- *ptask = next_stream_task(m);
- rv = (*ptask != NULL && !h2_iq_empty(m->q))? APR_EAGAIN : APR_SUCCESS;
- }
- if (APR_EAGAIN != rv) {
- m->is_registered = 0; /* h2_workers will discard this mplx */
- }
- H2_MPLX_LEAVE(m);
- return rv;
-}
-
-static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
-{
- h2_stream *stream;
-
- if (task->frozen) {
- /* this task was handed over to an engine for processing
- * and the original worker has finished. That means the
- * engine may start processing now. */
- h2_task_thaw(task);
- apr_thread_cond_broadcast(m->task_thawed);
- return;
- }
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): task(%s) done", m->id, task->id);
- out_close(m, task);
-
- if (ngn) {
- apr_off_t bytes = 0;
- h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
- bytes += h2_beam_get_buffered(task->output.beam);
- if (bytes > 0) {
- /* we need to report consumed and current buffered output
- * to the engine. The request will be streamed out or cancelled,
- * no more data is coming from it and the engine should update
- * its calculations before we destroy this information. */
- h2_req_engine_out_consumed(ngn, task->c, bytes);
- }
- }
-
- if (task->engine) {
- if (!m->aborted && !task->c->aborted
- && !h2_req_engine_is_shutdown(task->engine)) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(10022)
- "h2_mplx(%ld): task(%s) has not-shutdown "
- "engine(%s)", m->id, task->id,
- h2_req_engine_get_id(task->engine));
- }
- h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
- }
-
- task->worker_done = 1;
- task->done_at = apr_time_now();
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%s): request done, %f ms elapsed", task->id,
- (task->done_at - task->started_at) / 1000.0);
-
- if (task->started_at > m->last_idle_block) {
- /* this task finished without causing an 'idle block', e.g.
- * a block by flow control.
- */
- if (task->done_at- m->last_limit_change >= m->limit_change_interval
- && m->limit_active < m->max_active) {
- /* Well behaving stream, allow it more workers */
- m->limit_active = H2MIN(m->limit_active * 2,
- m->max_active);
- m->last_limit_change = task->done_at;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): increase worker limit to %d",
- m->id, m->limit_active);
- }
- }
-
- stream = h2_ihash_get(m->streams, task->stream_id);
- if (stream) {
- /* stream not done yet. */
- if (!m->aborted && h2_ihash_get(m->sredo, stream->id)) {
- /* reset and schedule again */
- h2_task_redo(task);
- h2_ihash_remove(m->sredo, stream->id);
- h2_iq_add(m->q, stream->id, NULL, NULL);
}
else {
- /* stream not cleaned up, stay around */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- H2_STRM_MSG(stream, "task_done, stream open"));
- if (stream->input) {
- h2_beam_leave(stream->input);
- }
-
- /* more data will not arrive, resume the stream */
- check_data_for(m, stream, 0);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
+ H2_MPLX_MSG(m, "stream %d not found to process"), sid);
}
}
- else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
- /* stream is done, was just waiting for this. */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- H2_STRM_MSG(stream, "task_done, in hold"));
- if (stream->input) {
- h2_beam_leave(stream->input);
+ if ((m->processing_count < m->processing_limit) && !h2_iq_empty(m->q)) {
+ H2_MPLX_LEAVE(m);
+ rv = h2_workers_activate(m->workers, m->producer);
+ H2_MPLX_ENTER_ALWAYS(m);
+ if (rv != APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1, APLOGNO(10021)
+ H2_MPLX_MSG(m, "activate at workers"));
}
- stream_joined(m, stream);
- }
- else if ((stream = h2_ihash_get(m->spurge, task->stream_id)) != NULL) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
- H2_STRM_LOG(APLOGNO(03517), stream, "already in spurge"));
- ap_assert("stream should not be in spurge" == NULL);
}
- else {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03518)
- "h2_mplx(%s): task_done, stream not found",
- task->id);
- ap_assert("stream should still be available" == NULL);
- }
-}
+ *pstream_count = h2_ihash_count(m->streams);
-void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
-{
- H2_MPLX_ENTER_ALWAYS(m);
+#if APR_POOL_DEBUG
+ do {
+ apr_size_t mem_g, mem_m, mem_s, mem_c1;
- task_done(m, task, NULL);
- --m->tasks_active;
-
- if (m->join_wait) {
- apr_thread_cond_signal(m->join_wait);
- }
- if (ptask) {
- /* caller wants another task */
- *ptask = next_stream_task(m);
- }
- register_if_needed(m);
+ mem_g = pchild? apr_pool_num_bytes(pchild, 1) : 0;
+ mem_m = apr_pool_num_bytes(m->pool, 1);
+ mem_s = apr_pool_num_bytes(session->pool, 1);
+ mem_c1 = apr_pool_num_bytes(m->c1->pool, 1);
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c1,
+ H2_MPLX_MSG(m, "child mem=%ld, mplx mem=%ld, session mem=%ld, c1=%ld"),
+ (long)mem_g, (long)mem_m, (long)mem_s, (long)mem_c1);
+
+ } while (0);
+#endif
H2_MPLX_LEAVE(m);
}
-/*******************************************************************************
- * h2_mplx DoS protection
- ******************************************************************************/
+static void c2_beam_input_write_notify(void *ctx, h2_bucket_beam *beam)
+{
+ conn_rec *c = ctx;
+ h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
-static int latest_repeatable_unsubmitted_iter(void *data, void *val)
+ (void)beam;
+ if (conn_ctx && conn_ctx->stream_id && conn_ctx->pipe_in[H2_PIPE_IN]) {
+ apr_file_putc(1, conn_ctx->pipe_in[H2_PIPE_IN]);
+ }
+}
+
+static void add_stream_poll_event(h2_mplx *m, int stream_id, h2_iqueue *q)
{
- stream_iter_ctx *ctx = data;
- h2_stream *stream = val;
-
- if (stream->task && !stream->task->worker_done
- && h2_task_can_redo(stream->task)
- && !h2_ihash_get(ctx->m->sredo, stream->id)) {
- if (!h2_stream_is_ready(stream)) {
- /* this task occupies a worker, the response has not been submitted
- * yet, not been cancelled and it is a repeatable request
- * -> it can be re-scheduled later */
- if (!ctx->stream
- || (ctx->stream->task->started_at < stream->task->started_at)) {
- /* we did not have one or this one was started later */
- ctx->stream = stream;
- }
- }
+ apr_thread_mutex_lock(m->poll_lock);
+ if (h2_iq_append(q, stream_id) && h2_iq_count(q) == 1) {
+ /* newly added first */
+ apr_pollset_wakeup(m->pollset);
}
- return 1;
+ apr_thread_mutex_unlock(m->poll_lock);
}
-static h2_stream *get_latest_repeatable_unsubmitted_stream(h2_mplx *m)
+static void c2_beam_input_read_notify(void *ctx, h2_bucket_beam *beam)
{
- stream_iter_ctx ctx;
- ctx.m = m;
- ctx.stream = NULL;
- h2_ihash_iter(m->streams, latest_repeatable_unsubmitted_iter, &ctx);
- return ctx.stream;
+ conn_rec *c = ctx;
+ h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
+
+ if (conn_ctx && conn_ctx->stream_id) {
+ add_stream_poll_event(conn_ctx->mplx, conn_ctx->stream_id,
+ conn_ctx->mplx->streams_input_read);
+ }
}
-static int timed_out_busy_iter(void *data, void *val)
+static void c2_beam_input_read_eagain(void *ctx, h2_bucket_beam *beam)
{
- stream_iter_ctx *ctx = data;
- h2_stream *stream = val;
- if (stream->task && !stream->task->worker_done
- && (ctx->now - stream->task->started_at) > stream->task->timeout) {
- /* timed out stream occupying a worker, found */
- ctx->stream = stream;
- return 0;
+ conn_rec *c = ctx;
+ h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
+ /* installed in the input bucket beams when we use pipes.
+ * Drain the pipe just before the beam returns APR_EAGAIN.
+ * A clean state for allowing polling on the pipe to rest
+ * when the beam is empty */
+ if (conn_ctx && conn_ctx->pipe_in[H2_PIPE_OUT]) {
+ h2_util_drain_pipe(conn_ctx->pipe_in[H2_PIPE_OUT]);
}
- return 1;
}
-static h2_stream *get_timed_out_busy_stream(h2_mplx *m)
+static void c2_beam_output_write_notify(void *ctx, h2_bucket_beam *beam)
{
- stream_iter_ctx ctx;
- ctx.m = m;
- ctx.stream = NULL;
- ctx.now = apr_time_now();
- h2_ihash_iter(m->streams, timed_out_busy_iter, &ctx);
- return ctx.stream;
+ conn_rec *c = ctx;
+ h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
+
+ if (conn_ctx && conn_ctx->stream_id) {
+ add_stream_poll_event(conn_ctx->mplx, conn_ctx->stream_id,
+ conn_ctx->mplx->streams_output_written);
+ }
}
-static apr_status_t unschedule_slow_tasks(h2_mplx *m)
+static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream, h2_c2_transit *transit)
{
- h2_stream *stream;
- int n;
-
- /* Try to get rid of streams that occupy workers. Look for safe requests
- * that are repeatable. If none found, fail the connection.
- */
- n = (m->tasks_active - m->limit_active - (int)h2_ihash_count(m->sredo));
- while (n > 0 && (stream = get_latest_repeatable_unsubmitted_stream(m))) {
- h2_task_rst(stream->task, H2_ERR_CANCEL);
- h2_ihash_add(m->sredo, stream);
- --n;
+ h2_conn_ctx_t *conn_ctx;
+ apr_status_t rv = APR_SUCCESS;
+ const char *action = "init";
+
+ rv = h2_conn_ctx_init_for_c2(&conn_ctx, c2, m, stream, transit);
+ if (APR_SUCCESS != rv) goto cleanup;
+
+ if (!conn_ctx->beam_out) {
+ action = "create output beam";
+ rv = h2_beam_create(&conn_ctx->beam_out, c2, conn_ctx->req_pool,
+ stream->id, "output", 0, c2->base_server->timeout);
+ if (APR_SUCCESS != rv) goto cleanup;
+
+ h2_beam_buffer_size_set(conn_ctx->beam_out, m->stream_max_mem);
+ h2_beam_on_was_empty(conn_ctx->beam_out, c2_beam_output_write_notify, c2);
}
-
- if ((m->tasks_active - h2_ihash_count(m->sredo)) > m->limit_active) {
- h2_stream *stream = get_timed_out_busy_stream(m);
- if (stream) {
- /* Too many busy workers, unable to cancel enough streams
- * and with a busy, timed out stream, we tell the client
- * to go away... */
- return APR_TIMEUP;
- }
+
+ memset(&conn_ctx->pipe_in, 0, sizeof(conn_ctx->pipe_in));
+ if (stream->input) {
+ conn_ctx->beam_in = stream->input;
+ h2_beam_on_send(stream->input, c2_beam_input_write_notify, c2);
+ h2_beam_on_received(stream->input, c2_beam_input_read_notify, c2);
+ h2_beam_on_consumed(stream->input, c1_input_consumed, stream);
+#if H2_USE_PIPES
+ action = "create input write pipe";
+ rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in[H2_PIPE_OUT],
+ &conn_ctx->pipe_in[H2_PIPE_IN],
+ APR_READ_BLOCK,
+ c2->pool, c2->pool);
+ if (APR_SUCCESS != rv) goto cleanup;
+#endif
+ h2_beam_on_eagain(stream->input, c2_beam_input_read_eagain, c2);
+ if (!h2_beam_empty(stream->input))
+ c2_beam_input_write_notify(c2, stream->input);
+ }
+
+cleanup:
+ stream->output = (APR_SUCCESS == rv)? conn_ctx->beam_out : NULL;
+ if (APR_SUCCESS != rv) {
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, c2,
+ H2_STRM_LOG(APLOGNO(10309), stream,
+ "error %s"), action);
}
- return APR_SUCCESS;
+ return rv;
}
-apr_status_t h2_mplx_idle(h2_mplx *m)
+static conn_rec *s_next_c2(h2_mplx *m)
{
- apr_status_t status = APR_SUCCESS;
- apr_time_t now;
- apr_size_t scount;
-
- H2_MPLX_ENTER(m);
+ h2_stream *stream = NULL;
+ apr_status_t rv = APR_SUCCESS;
+ apr_uint32_t sid;
+ conn_rec *c2 = NULL;
+ h2_c2_transit *transit = NULL;
- scount = h2_ihash_count(m->streams);
- if (scount > 0) {
- if (m->tasks_active) {
- /* If we have streams in connection state 'IDLE', meaning
- * all streams are ready to sent data out, but lack
- * WINDOW_UPDATEs.
- *
- * This is ok, unless we have streams that still occupy
- * h2 workers. As worker threads are a scarce resource,
- * we need to take measures that we do not get DoSed.
- *
- * This is what we call an 'idle block'. Limit the amount
- * of busy workers we allow for this connection until it
- * well behaves.
- */
- now = apr_time_now();
- m->last_idle_block = now;
- if (m->limit_active > 2
- && now - m->last_limit_change >= m->limit_change_interval) {
- if (m->limit_active > 16) {
- m->limit_active = 16;
- }
- else if (m->limit_active > 8) {
- m->limit_active = 8;
- }
- else if (m->limit_active > 4) {
- m->limit_active = 4;
- }
- else if (m->limit_active > 2) {
- m->limit_active = 2;
- }
- m->last_limit_change = now;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): decrease worker limit to %d",
- m->id, m->limit_active);
- }
-
- if (m->tasks_active > m->limit_active) {
- status = unschedule_slow_tasks(m);
- }
- }
- else if (!h2_iq_empty(m->q)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): idle, but %d streams to process",
- m->id, (int)h2_iq_count(m->q));
- status = APR_EAGAIN;
- }
- else {
- /* idle, have streams, but no tasks active. what are we waiting for?
- * WINDOW_UPDATEs from client? */
- h2_stream *stream = NULL;
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): idle, no tasks ongoing, %d streams",
- m->id, (int)h2_ihash_count(m->streams));
- h2_ihash_shift(m->streams, (void**)&stream, 1);
- if (stream) {
- h2_ihash_add(m->streams, stream);
- if (stream->output && !stream->out_checked) {
- /* FIXME: this looks like a race between the session thinking
- * it is idle and the EOF on a stream not being sent.
- * Signal to caller to leave IDLE state.
- */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- H2_STRM_MSG(stream, "output closed=%d, mplx idle"
- ", out has %ld bytes buffered"),
- h2_beam_is_closed(stream->output),
- (long)h2_beam_get_buffered(stream->output));
- h2_ihash_add(m->streams, stream);
- check_data_for(m, stream, 0);
- stream->out_checked = 1;
- status = APR_EAGAIN;
- }
- }
+ while (!m->aborted && !stream && (m->processing_count < m->processing_limit)
+ && (sid = h2_iq_shift(m->q)) > 0) {
+ stream = h2_ihash_get(m->streams, sid);
+ }
+
+ if (!stream) {
+ if (m->processing_count >= m->processing_limit && !h2_iq_empty(m->q)) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1,
+ H2_MPLX_MSG(m, "delaying request processing. "
+ "Current limit is %d and %d workers are in use."),
+ m->processing_limit, m->processing_count);
}
+ goto cleanup;
}
- register_if_needed(m);
- H2_MPLX_LEAVE(m);
- return status;
-}
+ if (sid > m->max_stream_id_started) {
+ m->max_stream_id_started = sid;
+ }
-/*******************************************************************************
- * HTTP/2 request engines
- ******************************************************************************/
+ transit = c2_transit_get(m);
+#if AP_HAS_RESPONSE_BUCKETS
+ c2 = ap_create_secondary_connection(transit->pool, m->c1, transit->bucket_alloc);
+#else
+ c2 = h2_c2_create(m->c1, transit->pool, transit->bucket_alloc);
+#endif
+ if (!c2) goto cleanup;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c1,
+ H2_STRM_MSG(stream, "created new c2"));
-typedef struct {
- h2_mplx * m;
- h2_req_engine *ngn;
- int streams_updated;
-} ngn_update_ctx;
+ rv = c2_setup_io(m, c2, stream, transit);
+ if (APR_SUCCESS != rv) goto cleanup;
-static int ngn_update_window(void *ctx, void *val)
-{
- ngn_update_ctx *uctx = ctx;
- h2_stream *stream = val;
- if (stream->task && stream->task->assigned == uctx->ngn
- && output_consumed_signal(uctx->m, stream->task)) {
- ++uctx->streams_updated;
+ stream->c2 = c2;
+ ++m->processing_count;
+
+cleanup:
+ if (APR_SUCCESS != rv && c2) {
+ h2_c2_destroy(c2);
+ c2 = NULL;
}
- return 1;
+ if (transit && !c2) {
+ c2_transit_recycle(m, transit);
+ }
+ return c2;
}
-static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn)
+static conn_rec *c2_prod_next(void *baton, int *phas_more)
{
- ngn_update_ctx ctx;
-
- ctx.m = m;
- ctx.ngn = ngn;
- ctx.streams_updated = 0;
- h2_ihash_iter(m->streams, ngn_update_window, &ctx);
-
- return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN;
+ h2_mplx *m = baton;
+ conn_rec *c = NULL;
+
+ H2_MPLX_ENTER_ALWAYS(m);
+ if (!m->aborted) {
+ c = s_next_c2(m);
+ *phas_more = (c != NULL && !h2_iq_empty(m->q));
+ }
+ H2_MPLX_LEAVE(m);
+ return c;
}
-apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
- request_rec *r,
- http2_req_engine_init *einit)
+static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx)
{
- apr_status_t status;
- h2_mplx *m;
- h2_task *task;
h2_stream *stream;
+
+ ap_assert(conn_ctx);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c2,
+ "h2_mplx(%s-%d): c2 done", conn_ctx->id, conn_ctx->stream_id);
+
+ AP_DEBUG_ASSERT(apr_atomic_read32(&conn_ctx->done) == 0);
+ apr_atomic_set32(&conn_ctx->done, 1);
+ conn_ctx->done_at = apr_time_now();
+ ++c2->keepalives;
+ /* From here on, the final handling of c2 is done by c1 processing.
+ * Which means we can give it c1's scoreboard handle for updates. */
+ c2->sbh = m->c1->sbh;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2,
+ "h2_mplx(%s-%d): request done, %f ms elapsed",
+ conn_ctx->id, conn_ctx->stream_id,
+ (conn_ctx->done_at - conn_ctx->started_at) / 1000.0);
- task = h2_ctx_rget_task(r);
- if (!task) {
- return APR_ECONNABORTED;
+ if (!conn_ctx->has_final_response) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, conn_ctx->last_err, c2,
+ "h2_c2(%s-%d): processing finished without final response",
+ conn_ctx->id, conn_ctx->stream_id);
+ c2->aborted = 1;
+ if (conn_ctx->beam_out)
+ h2_beam_abort(conn_ctx->beam_out, c2);
+ }
+ else if (!conn_ctx->beam_out || !h2_beam_is_complete(conn_ctx->beam_out)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, conn_ctx->last_err, c2,
+ "h2_c2(%s-%d): processing finished with incomplete output",
+ conn_ctx->id, conn_ctx->stream_id);
+ c2->aborted = 1;
+ h2_beam_abort(conn_ctx->beam_out, c2);
+ }
+ else if (!c2->aborted) {
+ s_mplx_be_happy(m, c2, conn_ctx);
}
- m = task->mplx;
- H2_MPLX_ENTER(m);
-
- stream = h2_ihash_get(m->streams, task->stream_id);
+ stream = h2_ihash_get(m->streams, conn_ctx->stream_id);
if (stream) {
- status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit);
+ /* stream not done yet. trigger a potential polling on the output
+ * since nothing more will happening here. */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2,
+ H2_STRM_MSG(stream, "c2_done, stream open"));
+ c2_beam_output_write_notify(c2, NULL);
}
- else {
- status = APR_ECONNABORTED;
+ else if ((stream = h2_ihash_get(m->shold, conn_ctx->stream_id)) != NULL) {
+ /* stream is done, was just waiting for this. */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2,
+ H2_STRM_MSG(stream, "c2_done, in hold"));
+ c1c2_stream_joined(m, stream);
}
+ else {
+ int i;
+
+ for (i = 0; i < m->spurge->nelts; ++i) {
+ if (stream == APR_ARRAY_IDX(m->spurge, i, h2_stream*)) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c2,
+ H2_STRM_LOG(APLOGNO(03517), stream, "already in spurge"));
+ ap_assert("stream should not be in spurge" == NULL);
+ return;
+ }
+ }
- H2_MPLX_LEAVE(m);
- return status;
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c2, APLOGNO(03518)
+ "h2_mplx(%s-%d): c2_done, stream not found",
+ conn_ctx->id, conn_ctx->stream_id);
+ ap_assert("stream should still be available" == NULL);
+ }
}
-apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
- apr_read_type_e block,
- int capacity,
- request_rec **pr)
-{
- h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
- h2_mplx *m = h2_ngn_shed_get_ctx(shed);
- apr_status_t status;
- int want_shutdown;
-
- H2_MPLX_ENTER(m);
+static void c2_prod_done(void *baton, conn_rec *c2)
+{
+ h2_mplx *m = baton;
+ h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c2);
- want_shutdown = (block == APR_BLOCK_READ);
+ AP_DEBUG_ASSERT(conn_ctx);
+ H2_MPLX_ENTER_ALWAYS(m);
- /* Take this opportunity to update output consummation
- * for this engine */
- ngn_out_update_windows(m, ngn);
-
- if (want_shutdown && !h2_iq_empty(m->q)) {
- /* For a blocking read, check first if requests are to be
- * had and, if not, wait a short while before doing the
- * blocking, and if unsuccessful, terminating read.
- */
- status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
- if (APR_STATUS_IS_EAGAIN(status)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): start block engine pull", m->id);
- apr_thread_cond_timedwait(m->task_thawed, m->lock,
- apr_time_from_msec(20));
- status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
- }
- }
- else {
- status = h2_ngn_shed_pull_request(shed, ngn, capacity,
- want_shutdown, pr);
- }
+ --m->processing_count;
+ s_c2_done(m, c2, conn_ctx);
+ if (m->join_wait) apr_thread_cond_signal(m->join_wait);
H2_MPLX_LEAVE(m);
- return status;
}
-
-void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn,
- apr_status_t status)
+
+static void workers_shutdown(void *baton, int graceful)
{
- h2_task *task = h2_ctx_cget_task(r_conn);
-
- if (task) {
- h2_mplx *m = task->mplx;
- h2_stream *stream;
+ h2_mplx *m = baton;
+
+ apr_thread_mutex_lock(m->poll_lock);
+ /* time to wakeup and assess what to do */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
+ H2_MPLX_MSG(m, "workers shutdown, waking pollset"));
+ m->shutdown = 1;
+ if (!graceful) {
+ m->aborted = 1;
+ }
+ apr_pollset_wakeup(m->pollset);
+ apr_thread_mutex_unlock(m->poll_lock);
+}
- H2_MPLX_ENTER_ALWAYS(m);
+/*******************************************************************************
+ * h2_mplx DoS protection
+ ******************************************************************************/
- stream = h2_ihash_get(m->streams, task->stream_id);
-
- ngn_out_update_windows(m, ngn);
- h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
-
- if (status != APR_SUCCESS && stream
- && h2_task_can_redo(task)
- && !h2_ihash_get(m->sredo, stream->id)) {
- h2_ihash_add(m->sredo, stream);
- }
+static void s_mplx_be_happy(h2_mplx *m, conn_rec *c, h2_conn_ctx_t *conn_ctx)
+{
+ apr_time_t now;
- if (task->engine) {
- /* cannot report that as done until engine returns */
- }
- else {
- task_done(m, task, ngn);
+ if (m->processing_limit < m->processing_max
+ && conn_ctx->started_at > m->last_mood_change) {
+ --m->irritations_since;
+ if (m->processing_limit < m->processing_max
+ && ((now = apr_time_now()) - m->last_mood_change >= m->mood_update_interval
+ || m->irritations_since < -m->processing_limit)) {
+ m->processing_limit = H2MIN(m->processing_limit * 2, m->processing_max);
+ m->last_mood_change = now;
+ m->irritations_since = 0;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ H2_MPLX_MSG(m, "mood update, increasing worker limit to %d"),
+ m->processing_limit);
}
+ }
+}
- H2_MPLX_LEAVE(m);
+static void m_be_annoyed(h2_mplx *m)
+{
+ apr_time_t now;
+
+ if (m->processing_limit > 2) {
+ ++m->irritations_since;
+ if (((now = apr_time_now()) - m->last_mood_change >= m->mood_update_interval)
+ || (m->irritations_since >= m->processing_limit)) {
+
+ if (m->processing_limit > 16) {
+ m->processing_limit = 16;
+ }
+ else if (m->processing_limit > 8) {
+ m->processing_limit = 8;
+ }
+ else if (m->processing_limit > 4) {
+ m->processing_limit = 4;
+ }
+ else if (m->processing_limit > 2) {
+ m->processing_limit = 2;
+ }
+ m->last_mood_change = now;
+ m->irritations_since = 0;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
+ H2_MPLX_MSG(m, "mood update, decreasing worker limit to %d"),
+ m->processing_limit);
+ }
}
}
@@ -1224,59 +1097,168 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn,
* mplx master events dispatching
******************************************************************************/
-int h2_mplx_has_master_events(h2_mplx *m)
+static int reset_is_acceptable(h2_stream *stream)
{
- return apr_atomic_read32(&m->event_pending) > 0;
+ /* client may terminate a stream via H2 RST_STREAM message at any time.
+ * This is annyoing when we have committed resources (e.g. worker threads)
+ * to it, so our mood (e.g. willingness to commit resources on this
+ * connection in the future) goes down.
+ *
+ * This is a DoS protection. We do not want to make it too easy for
+ * a client to eat up server resources.
+ *
+ * However: there are cases where a RST_STREAM is the only way to end
+ * a request. This includes websockets and server-side-event streams (SSEs).
+ * The responses to such requests continue forever otherwise.
+ *
+ */
+ if (!stream_is_running(stream)) return 1;
+ if (!(stream->id & 0x01)) return 1; /* stream initiated by us. acceptable. */
+ if (!stream->response) return 0; /* no response headers produced yet. bad. */
+ if (!stream->out_data_frames) return 0; /* no response body data sent yet. bad. */
+ return 1; /* otherwise, be forgiving */
}
-apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
- stream_ev_callback *on_resume,
- void *on_ctx)
+apr_status_t h2_mplx_c1_client_rst(h2_mplx *m, int stream_id, h2_stream *stream)
{
- h2_stream *stream;
- int n, id;
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): dispatch events", m->id);
- apr_atomic_set32(&m->event_pending, 0);
+ apr_status_t status = APR_SUCCESS;
+ int registered;
- /* update input windows for streams */
- h2_ihash_iter(m->streams, report_consumption_iter, m);
- purge_streams(m, 1);
-
- n = h2_ififo_count(m->readyq);
- while (n > 0
- && (h2_ififo_try_pull(m->readyq, &id) == APR_SUCCESS)) {
- --n;
- stream = h2_ihash_get(m->streams, id);
- if (stream) {
- on_resume(on_ctx, stream);
- }
+ H2_MPLX_ENTER_ALWAYS(m);
+ registered = (h2_ihash_get(m->streams, stream_id) != NULL);
+ if (!stream) {
+ /* a RST might arrive so late, we have already forgotten
+ * about it. Seems ok. */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1,
+ H2_MPLX_MSG(m, "RST on unknown stream %d"), stream_id);
+ AP_DEBUG_ASSERT(!registered);
+ }
+ else if (!registered) {
+ /* a RST on a stream that mplx has not been told about, but
+ * which the session knows. Very early and annoying. */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1,
+ H2_STRM_MSG(stream, "very early RST, drop"));
+ h2_stream_set_monitor(stream, NULL);
+ h2_stream_rst(stream, H2_ERR_STREAM_CLOSED);
+ h2_stream_dispatch(stream, H2_SEV_EOS_SENT);
+ m_stream_cleanup(m, stream);
+ m_be_annoyed(m);
+ }
+ else if (!reset_is_acceptable(stream)) {
+ m_be_annoyed(m);
}
-
- return APR_SUCCESS;
+ H2_MPLX_LEAVE(m);
+ return status;
}
-apr_status_t h2_mplx_keep_active(h2_mplx *m, h2_stream *stream)
+static apr_status_t mplx_pollset_create(h2_mplx *m)
{
- check_data_for(m, stream, 1);
- return APR_SUCCESS;
+ /* stream0 output only */
+ return apr_pollset_create(&m->pollset, 1, m->pool,
+ APR_POLLSET_WAKEABLE);
}
-int h2_mplx_awaits_data(h2_mplx *m)
+static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
+ stream_ev_callback *on_stream_input,
+ stream_ev_callback *on_stream_output,
+ void *on_ctx)
{
- int waiting = 1;
-
- H2_MPLX_ENTER_ALWAYS(m);
+ apr_status_t rv;
+ const apr_pollfd_t *results, *pfd;
+ apr_int32_t nresults, i;
+ h2_conn_ctx_t *conn_ctx;
+ h2_stream *stream;
- if (h2_ihash_empty(m->streams)) {
- waiting = 0;
- }
- else if (!m->tasks_active && !h2_ififo_count(m->readyq)
- && h2_iq_empty(m->q)) {
- waiting = 0;
- }
+ /* Make sure we are not called recursively. */
+ ap_assert(!m->polling);
+ m->polling = 1;
+ do {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
+ H2_MPLX_MSG(m, "enter polling timeout=%d"),
+ (int)apr_time_sec(timeout));
+
+ apr_array_clear(m->streams_ev_in);
+ apr_array_clear(m->streams_ev_out);
+
+ do {
+ /* add streams we started processing in the meantime */
+ apr_thread_mutex_lock(m->poll_lock);
+ if (!h2_iq_empty(m->streams_input_read)
+ || !h2_iq_empty(m->streams_output_written)) {
+ while ((i = h2_iq_shift(m->streams_input_read))) {
+ stream = h2_ihash_get(m->streams, i);
+ if (stream) {
+ APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream;
+ }
+ }
+ while ((i = h2_iq_shift(m->streams_output_written))) {
+ stream = h2_ihash_get(m->streams, i);
+ if (stream) {
+ APR_ARRAY_PUSH(m->streams_ev_out, h2_stream*) = stream;
+ }
+ }
+ nresults = 0;
+ rv = APR_SUCCESS;
+ apr_thread_mutex_unlock(m->poll_lock);
+ break;
+ }
+ apr_thread_mutex_unlock(m->poll_lock);
+
+ H2_MPLX_LEAVE(m);
+ rv = apr_pollset_poll(m->pollset, timeout >= 0? timeout : -1, &nresults, &results);
+ H2_MPLX_ENTER_ALWAYS(m);
+ if (APR_STATUS_IS_EINTR(rv) && m->shutdown) {
+ if (!m->aborted) {
+ rv = APR_SUCCESS;
+ }
+ goto cleanup;
+ }
+ } while (APR_STATUS_IS_EINTR(rv));
- H2_MPLX_LEAVE(m);
- return waiting;
+ if (APR_SUCCESS != rv) {
+ if (APR_STATUS_IS_TIMEUP(rv)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
+ H2_MPLX_MSG(m, "polling timed out "));
+ }
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1, APLOGNO(10310) \
+ H2_MPLX_MSG(m, "polling failed"));
+ }
+ goto cleanup;
+ }
+
+ for (i = 0; i < nresults; i++) {
+ pfd = &results[i];
+ conn_ctx = pfd->client_data;
+
+ AP_DEBUG_ASSERT(conn_ctx);
+ if (conn_ctx->stream_id == 0) {
+ if (on_stream_input) {
+ APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = m->stream0;
+ }
+ continue;
+ }
+ }
+
+ if (on_stream_input && m->streams_ev_in->nelts) {
+ H2_MPLX_LEAVE(m);
+ for (i = 0; i < m->streams_ev_in->nelts; ++i) {
+ on_stream_input(on_ctx, APR_ARRAY_IDX(m->streams_ev_in, i, h2_stream*));
+ }
+ H2_MPLX_ENTER_ALWAYS(m);
+ }
+ if (on_stream_output && m->streams_ev_out->nelts) {
+ H2_MPLX_LEAVE(m);
+ for (i = 0; i < m->streams_ev_out->nelts; ++i) {
+ on_stream_output(on_ctx, APR_ARRAY_IDX(m->streams_ev_out, i, h2_stream*));
+ }
+ H2_MPLX_ENTER_ALWAYS(m);
+ }
+ break;
+ } while(1);
+
+cleanup:
+ m->polling = 0;
+ return rv;
}
+