summaryrefslogtreecommitdiffstats
path: root/modules/http2/h2_stream.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--modules/http2/h2_stream.c1520
1 files changed, 1135 insertions, 385 deletions
diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c
index 24ebc56..ee87555 100644
--- a/modules/http2/h2_stream.c
+++ b/modules/http2/h2_stream.c
@@ -17,34 +17,39 @@
#include <assert.h>
#include <stddef.h>
-#include <apr_strings.h>
+#include "apr.h"
+#include "apr_strings.h"
+#include "apr_lib.h"
+#include "apr_strmatch.h"
#include <httpd.h>
#include <http_core.h>
#include <http_connection.h>
#include <http_log.h>
+#include <http_protocol.h>
+#include <http_ssl.h>
#include <nghttp2/nghttp2.h>
#include "h2_private.h"
#include "h2.h"
#include "h2_bucket_beam.h"
-#include "h2_conn.h"
+#include "h2_c1.h"
#include "h2_config.h"
-#include "h2_h2.h"
+#include "h2_protocol.h"
#include "h2_mplx.h"
#include "h2_push.h"
#include "h2_request.h"
#include "h2_headers.h"
#include "h2_session.h"
#include "h2_stream.h"
-#include "h2_task.h"
-#include "h2_ctx.h"
-#include "h2_task.h"
+#include "h2_c2.h"
+#include "h2_conn_ctx.h"
+#include "h2_c2.h"
#include "h2_util.h"
-static const char *h2_ss_str(h2_stream_state_t state)
+static const char *h2_ss_str(const h2_stream_state_t state)
{
switch (state) {
case H2_SS_IDLE:
@@ -68,7 +73,7 @@ static const char *h2_ss_str(h2_stream_state_t state)
}
}
-const char *h2_stream_state_str(h2_stream *stream)
+const char *h2_stream_state_str(const h2_stream *stream)
{
return h2_ss_str(stream->state);
}
@@ -120,7 +125,8 @@ static int trans_on_event[][H2_SS_MAX] = {
{ S_XXX, S_ERR, S_ERR, S_CL_L, S_CLS, S_XXX, S_XXX, S_XXX, },/* EV_CLOSED_L*/
{ S_ERR, S_ERR, S_ERR, S_CL_R, S_ERR, S_CLS, S_NOP, S_NOP, },/* EV_CLOSED_R*/
{ S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* EV_CANCELLED*/
-{ S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_XXX, },/* EV_EOS_SENT*/
+{ S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_NOP, },/* EV_EOS_SENT*/
+{ S_NOP, S_XXX, S_CLS, S_XXX, S_XXX, S_CLS, S_XXX, S_XXX, },/* EV_IN_ERROR*/
};
static int on_map(h2_stream_state_t state, int map[H2_SS_MAX])
@@ -142,7 +148,7 @@ static int on_frame(h2_stream_state_t state, int frame_type,
{
ap_assert(frame_type >= 0);
ap_assert(state >= 0);
- if (frame_type >= maxlen) {
+ if ((apr_size_t)frame_type >= maxlen) {
return state; /* NOP, ignore unknown frame types */
}
return on_map(state, frame_map[frame_type]);
@@ -160,6 +166,7 @@ static int on_frame_recv(h2_stream_state_t state, int frame_type)
static int on_event(h2_stream* stream, h2_stream_event_t ev)
{
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
if (stream->monitor && stream->monitor->on_event) {
stream->monitor->on_event(stream->monitor->ctx, stream, ev);
}
@@ -169,10 +176,18 @@ static int on_event(h2_stream* stream, h2_stream_event_t ev)
return stream->state;
}
+static ssize_t stream_data_cb(nghttp2_session *ng2s,
+ int32_t stream_id,
+ uint8_t *buf,
+ size_t length,
+ uint32_t *data_flags,
+ nghttp2_data_source *source,
+ void *puser);
+
static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag)
{
- if (APLOG_C_IS_LEVEL(s->session->c, lvl)) {
- conn_rec *c = s->session->c;
+ if (APLOG_C_IS_LEVEL(s->session->c1, lvl)) {
+ conn_rec *c = s->session->c1;
char buffer[4 * 1024];
apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]);
@@ -182,76 +197,116 @@ static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag)
}
}
-static apr_status_t setup_input(h2_stream *stream) {
- if (stream->input == NULL) {
- int empty = (stream->input_eof
- && (!stream->in_buffer
- || APR_BRIGADE_EMPTY(stream->in_buffer)));
- if (!empty) {
- h2_beam_create(&stream->input, stream->pool, stream->id,
- "input", H2_BEAM_OWNER_SEND, 0,
- stream->session->s->timeout);
- h2_beam_send_from(stream->input, stream->pool);
- }
+static void stream_setup_input(h2_stream *stream)
+{
+ if (stream->input != NULL) return;
+ ap_assert(!stream->input_closed);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c1,
+ H2_STRM_MSG(stream, "setup input beam"));
+ h2_beam_create(&stream->input, stream->session->c1,
+ stream->pool, stream->id,
+ "input", 0, stream->session->s->timeout);
+}
+
+apr_status_t h2_stream_prepare_processing(h2_stream *stream)
+{
+ /* Right before processing starts, last chance to decide if
+ * there is need to an input beam. */
+ if (!stream->input_closed) {
+ stream_setup_input(stream);
}
return APR_SUCCESS;
}
-static apr_status_t close_input(h2_stream *stream)
+static int input_buffer_is_empty(h2_stream *stream)
+{
+ return !stream->in_buffer || APR_BRIGADE_EMPTY(stream->in_buffer);
+}
+
+static apr_status_t input_flush(h2_stream *stream)
{
- conn_rec *c = stream->session->c;
apr_status_t status = APR_SUCCESS;
+ apr_off_t written;
- stream->input_eof = 1;
- if (stream->input && h2_beam_is_closed(stream->input)) {
- return APR_SUCCESS;
- }
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
- H2_STRM_MSG(stream, "closing input"));
- if (stream->rst_error) {
- return APR_ECONNRESET;
+ if (input_buffer_is_empty(stream)) goto cleanup;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c1,
+ H2_STRM_MSG(stream, "flush input"));
+ status = h2_beam_send(stream->input, stream->session->c1,
+ stream->in_buffer, APR_BLOCK_READ, &written);
+ stream->in_last_write = apr_time_now();
+ if (APR_SUCCESS != status && h2_stream_is_at(stream, H2_SS_CLOSED_L)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c1,
+ H2_STRM_MSG(stream, "send input error"));
+ h2_stream_dispatch(stream, H2_SEV_IN_ERROR);
}
-
- if (stream->trailers && !apr_is_empty_table(stream->trailers)) {
- apr_bucket *b;
- h2_headers *r;
-
- if (!stream->in_buffer) {
- stream->in_buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
- }
-
- r = h2_headers_create(HTTP_OK, stream->trailers, NULL,
- stream->in_trailer_octets, stream->pool);
- stream->trailers = NULL;
- b = h2_bucket_headers_create(c->bucket_alloc, r);
- APR_BRIGADE_INSERT_TAIL(stream->in_buffer, b);
-
- b = apr_bucket_eos_create(c->bucket_alloc);
- APR_BRIGADE_INSERT_TAIL(stream->in_buffer, b);
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
- H2_STRM_MSG(stream, "added trailers"));
- h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING);
+cleanup:
+ return status;
+}
+
+static void input_append_bucket(h2_stream *stream, apr_bucket *b)
+{
+ if (!stream->in_buffer) {
+ stream_setup_input(stream);
+ stream->in_buffer = apr_brigade_create(
+ stream->pool, stream->session->c1->bucket_alloc);
}
- if (stream->input) {
- h2_stream_flush_input(stream);
- return h2_beam_close(stream->input);
+ APR_BRIGADE_INSERT_TAIL(stream->in_buffer, b);
+}
+
+static void input_append_data(h2_stream *stream, const char *data, apr_size_t len)
+{
+ if (!stream->in_buffer) {
+ stream_setup_input(stream);
+ stream->in_buffer = apr_brigade_create(
+ stream->pool, stream->session->c1->bucket_alloc);
}
- return status;
+ apr_brigade_write(stream->in_buffer, NULL, NULL, data, len);
}
-static apr_status_t close_output(h2_stream *stream)
+
+static apr_status_t close_input(h2_stream *stream)
{
- if (!stream->output || h2_beam_is_closed(stream->output)) {
- return APR_SUCCESS;
+ conn_rec *c = stream->session->c1;
+ apr_status_t rv = APR_SUCCESS;
+ apr_bucket *b;
+
+ if (stream->input_closed) goto cleanup;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c1,
+ H2_STRM_MSG(stream, "closing input"));
+ if (!stream->rst_error
+ && stream->trailers_in
+ && !apr_is_empty_table(stream->trailers_in)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c1,
+ H2_STRM_MSG(stream, "adding trailers"));
+#if AP_HAS_RESPONSE_BUCKETS
+ b = ap_bucket_headers_create(stream->trailers_in,
+ stream->pool, c->bucket_alloc);
+#else
+ b = h2_bucket_headers_create(c->bucket_alloc,
+ h2_headers_create(HTTP_OK, stream->trailers_in, NULL,
+ stream->in_trailer_octets, stream->pool));
+#endif
+ input_append_bucket(stream, b);
+ stream->trailers_in = NULL;
+ }
+
+ stream->input_closed = 1;
+ if (stream->input) {
+ b = apr_bucket_eos_create(c->bucket_alloc);
+ input_append_bucket(stream, b);
+ input_flush(stream);
+ h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c1,
+ H2_STRM_MSG(stream, "input flush + EOS"));
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
- H2_STRM_MSG(stream, "closing output"));
- return h2_beam_leave(stream->output);
+
+cleanup:
+ return rv;
}
-static void on_state_enter(h2_stream *stream)
+static void on_state_enter(h2_stream *stream)
{
if (stream->monitor && stream->monitor->on_state_enter) {
stream->monitor->on_state_enter(stream->monitor->ctx, stream);
@@ -271,7 +326,7 @@ static void on_state_invalid(h2_stream *stream)
stream->monitor->on_state_invalid(stream->monitor->ctx, stream);
}
/* stream got an event/frame invalid in its state */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c1,
H2_STRM_MSG(stream, "invalid state event"));
switch (stream->state) {
case H2_SS_OPEN:
@@ -288,17 +343,17 @@ static void on_state_invalid(h2_stream *stream)
static apr_status_t transit(h2_stream *stream, int new_state)
{
- if (new_state == stream->state) {
+ if ((h2_stream_state_t)new_state == stream->state) {
return APR_SUCCESS;
}
else if (new_state < 0) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c1,
H2_STRM_LOG(APLOGNO(03081), stream, "invalid transition"));
on_state_invalid(stream);
return APR_EINVAL;
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c1,
H2_STRM_MSG(stream, "transit to [%s]"), h2_ss_str(new_state));
stream->state = new_state;
switch (new_state) {
@@ -312,14 +367,12 @@ static apr_status_t transit(h2_stream *stream, int new_state)
case H2_SS_OPEN:
break;
case H2_SS_CLOSED_L:
- close_output(stream);
break;
case H2_SS_CLOSED_R:
close_input(stream);
break;
case H2_SS_CLOSED:
close_input(stream);
- close_output(stream);
if (stream->out_buffer) {
apr_brigade_cleanup(stream->out_buffer);
}
@@ -340,19 +393,20 @@ void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev)
{
int new_state;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c1,
H2_STRM_MSG(stream, "dispatch event %d"), ev);
new_state = on_event(stream, ev);
if (new_state < 0) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c1,
H2_STRM_LOG(APLOGNO(10002), stream, "invalid event %d"), ev);
on_state_invalid(stream);
AP_DEBUG_ASSERT(new_state > S_XXX);
return;
}
- else if (new_state == stream->state) {
+ else if ((h2_stream_state_t)new_state == stream->state) {
/* nop */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c1,
H2_STRM_MSG(stream, "non-state event %d"), ev);
return;
}
@@ -365,9 +419,7 @@ void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev)
static void set_policy_for(h2_stream *stream, h2_request *r)
{
int enabled = h2_session_push_enabled(stream->session);
- stream->push_policy = h2_push_policy_determine(r->headers, stream->pool,
- enabled);
- r->serialize = h2_config_geti(stream->session->config, H2_CONF_SER_HEADERS);
+ stream->push_policy = h2_push_policy_determine(r->headers, stream->pool, enabled);
}
apr_status_t h2_stream_send_frame(h2_stream *stream, int ftype, int flags, size_t frame_len)
@@ -375,9 +427,10 @@ apr_status_t h2_stream_send_frame(h2_stream *stream, int ftype, int flags, size_
apr_status_t status = APR_SUCCESS;
int new_state, eos = 0;
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
new_state = on_frame_send(stream->state, ftype);
if (new_state < 0) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c1,
H2_STRM_MSG(stream, "invalid frame %d send"), ftype);
AP_DEBUG_ASSERT(new_state > S_XXX);
return transit(stream, new_state);
@@ -385,6 +438,12 @@ apr_status_t h2_stream_send_frame(h2_stream *stream, int ftype, int flags, size_
++stream->out_frames;
stream->out_frame_octets += frame_len;
+ if(stream->c2) {
+ h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(stream->c2);
+ if(conn_ctx)
+ conn_ctx->bytes_sent = stream->out_frame_octets;
+ }
+
switch (ftype) {
case NGHTTP2_DATA:
eos = (flags & NGHTTP2_FLAG_END_STREAM);
@@ -398,24 +457,18 @@ apr_status_t h2_stream_send_frame(h2_stream *stream, int ftype, int flags, size_
/* start pushed stream */
ap_assert(stream->request == NULL);
ap_assert(stream->rtmp != NULL);
- status = h2_request_end_headers(stream->rtmp, stream->pool, 1, 0);
- if (status != APR_SUCCESS) {
- return status;
- }
- set_policy_for(stream, stream->rtmp);
- stream->request = stream->rtmp;
- stream->rtmp = NULL;
+ status = h2_stream_end_headers(stream, 1, 0);
+ if (status != APR_SUCCESS) goto leave;
break;
default:
break;
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
- H2_STRM_MSG(stream, "send frame %d, eos=%d"), ftype, eos);
status = transit(stream, new_state);
if (status == APR_SUCCESS && eos) {
status = transit(stream, on_event(stream, H2_SEV_CLOSED_L));
}
+leave:
return status;
}
@@ -424,9 +477,10 @@ apr_status_t h2_stream_recv_frame(h2_stream *stream, int ftype, int flags, size_
apr_status_t status = APR_SUCCESS;
int new_state, eos = 0;
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
new_state = on_frame_recv(stream->state, ftype);
if (new_state < 0) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c1,
H2_STRM_MSG(stream, "invalid frame %d recv"), ftype);
AP_DEBUG_ASSERT(new_state > S_XXX);
return transit(stream, new_state);
@@ -439,7 +493,7 @@ apr_status_t h2_stream_recv_frame(h2_stream *stream, int ftype, int flags, size_
case NGHTTP2_HEADERS:
eos = (flags & NGHTTP2_FLAG_END_STREAM);
- if (stream->state == H2_SS_OPEN) {
+ if (h2_stream_is_at_or_past(stream, H2_SS_OPEN)) {
/* trailer HEADER */
if (!eos) {
h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
@@ -451,18 +505,13 @@ apr_status_t h2_stream_recv_frame(h2_stream *stream, int ftype, int flags, size_
ap_assert(stream->request == NULL);
if (stream->rtmp == NULL) {
/* This can only happen, if the stream has received no header
- * name/value pairs at all. The lastest nghttp2 version have become
+ * name/value pairs at all. The latest nghttp2 version have become
* pretty good at detecting this early. In any case, we have
* to abort the connection here, since this is clearly a protocol error */
return APR_EINVAL;
}
- status = h2_request_end_headers(stream->rtmp, stream->pool, eos, frame_len);
- if (status != APR_SUCCESS) {
- return status;
- }
- set_policy_for(stream, stream->rtmp);
- stream->request = stream->rtmp;
- stream->rtmp = NULL;
+ status = h2_stream_end_headers(stream, eos, frame_len);
+ if (status != APR_SUCCESS) goto leave;
}
break;
@@ -473,22 +522,7 @@ apr_status_t h2_stream_recv_frame(h2_stream *stream, int ftype, int flags, size_
if (status == APR_SUCCESS && eos) {
status = transit(stream, on_event(stream, H2_SEV_CLOSED_R));
}
- return status;
-}
-
-apr_status_t h2_stream_flush_input(h2_stream *stream)
-{
- apr_status_t status = APR_SUCCESS;
-
- if (stream->in_buffer && !APR_BRIGADE_EMPTY(stream->in_buffer)) {
- setup_input(stream);
- status = h2_beam_send(stream->input, stream->in_buffer, APR_BLOCK_READ);
- stream->in_last_write = apr_time_now();
- }
- if (stream->input_eof
- && stream->input && !h2_beam_is_closed(stream->input)) {
- status = h2_beam_close(stream->input);
- }
+leave:
return status;
}
@@ -498,41 +532,59 @@ apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
h2_session *session = stream->session;
apr_status_t status = APR_SUCCESS;
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
stream->in_data_frames++;
if (len > 0) {
- if (APLOGctrace3(session->c)) {
+ if (APLOGctrace3(session->c1)) {
const char *load = apr_pstrndup(stream->pool, (const char *)data, len);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, session->c1,
H2_STRM_MSG(stream, "recv DATA, len=%d: -->%s<--"),
(int)len, load);
}
else {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c1,
H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len);
}
stream->in_data_octets += len;
- if (!stream->in_buffer) {
- stream->in_buffer = apr_brigade_create(stream->pool,
- session->c->bucket_alloc);
- }
- apr_brigade_write(stream->in_buffer, NULL, NULL, (const char *)data, len);
+ input_append_data(stream, (const char*)data, len);
+ input_flush(stream);
h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING);
}
return status;
}
-static void prep_output(h2_stream *stream) {
- conn_rec *c = stream->session->c;
- if (!stream->out_buffer) {
- stream->out_buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
+#ifdef AP_DEBUG
+static apr_status_t stream_pool_destroy(void *data)
+{
+ h2_stream *stream = data;
+ switch (stream->magic) {
+ case H2_STRM_MAGIC_OK:
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, stream->session->c1,
+ H2_STRM_MSG(stream, "was not destroyed explicitly"));
+ AP_DEBUG_ASSERT(0);
+ break;
+ case H2_STRM_MAGIC_SDEL:
+ /* stream has been explicitly destroyed, as it should */
+ H2_STRM_ASSIGN_MAGIC(stream, H2_STRM_MAGIC_PDEL);
+ break;
+ case H2_STRM_MAGIC_PDEL:
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, stream->session->c1,
+ H2_STRM_MSG(stream, "already pool destroyed"));
+ AP_DEBUG_ASSERT(0);
+ break;
+ default:
+ AP_DEBUG_ASSERT(0);
}
+ return APR_SUCCESS;
}
+#endif
h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session,
h2_stream_monitor *monitor, int initiated_on)
{
h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
-
+
+ H2_STRM_ASSIGN_MAGIC(stream, H2_STRM_MAGIC_OK);
stream->id = id;
stream->initiated_on = initiated_on;
stream->created = apr_time_now();
@@ -540,15 +592,21 @@ h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session,
stream->pool = pool;
stream->session = session;
stream->monitor = monitor;
- stream->max_mem = session->max_stream_mem;
-
-#ifdef H2_NG2_LOCAL_WIN_SIZE
- stream->in_window_size =
- nghttp2_session_get_stream_local_window_size(
- stream->session->ngh2, stream->id);
+#ifdef AP_DEBUG
+ if (id) { /* stream 0 has special lifetime */
+ apr_pool_cleanup_register(pool, stream, stream_pool_destroy,
+ apr_pool_cleanup_null);
+ }
#endif
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+#ifdef H2_NG2_LOCAL_WIN_SIZE
+ if (id) {
+ stream->in_window_size =
+ nghttp2_session_get_stream_local_window_size(
+ stream->session->ngh2, stream->id);
+ }
+#endif
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c1,
H2_STRM_LOG(APLOGNO(03082), stream, "created"));
on_state_enter(stream);
return stream;
@@ -556,59 +614,35 @@ h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session,
void h2_stream_cleanup(h2_stream *stream)
{
- apr_status_t status;
-
+ /* Stream is done on c1. There might still be processing on a c2
+ * going on. The input/output beams get aborted and the stream's
+ * end of the in/out notifications get closed.
+ */
ap_assert(stream);
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
if (stream->out_buffer) {
- /* remove any left over output buckets that may still have
- * references into request pools */
apr_brigade_cleanup(stream->out_buffer);
}
- if (stream->input) {
- h2_beam_abort(stream->input);
- status = h2_beam_wait_empty(stream->input, APR_NONBLOCK_READ);
- if (status == APR_EAGAIN) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
- H2_STRM_MSG(stream, "wait on input drain"));
- status = h2_beam_wait_empty(stream->input, APR_BLOCK_READ);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
- H2_STRM_MSG(stream, "input drain returned"));
- }
- }
}
void h2_stream_destroy(h2_stream *stream)
{
ap_assert(stream);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c,
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c1,
H2_STRM_MSG(stream, "destroy"));
+ H2_STRM_ASSIGN_MAGIC(stream, H2_STRM_MAGIC_SDEL);
apr_pool_destroy(stream->pool);
}
-apr_status_t h2_stream_prep_processing(h2_stream *stream)
-{
- if (stream->request) {
- const h2_request *r = stream->request;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
- H2_STRM_MSG(stream, "schedule %s %s://%s%s chunked=%d"),
- r->method, r->scheme, r->authority, r->path, r->chunked);
- setup_input(stream);
- stream->scheduled = 1;
- return APR_SUCCESS;
- }
- return APR_EINVAL;
-}
-
void h2_stream_rst(h2_stream *stream, int error_code)
{
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
stream->rst_error = error_code;
- if (stream->input) {
- h2_beam_abort(stream->input);
- }
- if (stream->output) {
- h2_beam_leave(stream->output);
+ if (stream->c2) {
+ h2_c2_abort(stream->c2, stream->session->c1);
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c1,
H2_STRM_MSG(stream, "reset, error=%d"), error_code);
h2_stream_dispatch(stream, H2_SEV_CANCELLED);
}
@@ -619,6 +653,7 @@ apr_status_t h2_stream_set_request_rec(h2_stream *stream,
h2_request *req;
apr_status_t status;
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
ap_assert(stream->request == NULL);
ap_assert(stream->rtmp == NULL);
if (stream->rst_error) {
@@ -640,6 +675,7 @@ apr_status_t h2_stream_set_request_rec(h2_stream *stream,
void h2_stream_set_request(h2_stream *stream, const h2_request *r)
{
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
ap_assert(stream->request == NULL);
ap_assert(stream->rtmp == NULL);
stream->rtmp = h2_request_clone(stream->pool, r);
@@ -647,43 +683,46 @@ void h2_stream_set_request(h2_stream *stream, const h2_request *r)
static void set_error_response(h2_stream *stream, int http_status)
{
- if (!h2_stream_is_ready(stream)) {
- conn_rec *c = stream->session->c;
- apr_bucket *b;
- h2_headers *response;
-
- response = h2_headers_die(http_status, stream->request, stream->pool);
- prep_output(stream);
- b = apr_bucket_eos_create(c->bucket_alloc);
- APR_BRIGADE_INSERT_HEAD(stream->out_buffer, b);
- b = h2_bucket_headers_create(c->bucket_alloc, response);
- APR_BRIGADE_INSERT_HEAD(stream->out_buffer, b);
+ if (!h2_stream_is_ready(stream) && stream->rtmp) {
+ stream->rtmp->http_status = http_status;
}
}
static apr_status_t add_trailer(h2_stream *stream,
const char *name, size_t nlen,
- const char *value, size_t vlen)
+ const char *value, size_t vlen,
+ size_t max_field_len, int *pwas_added)
{
- conn_rec *c = stream->session->c;
+ conn_rec *c = stream->session->c1;
char *hname, *hvalue;
+ const char *existing;
+ *pwas_added = 0;
if (nlen == 0 || name[0] == ':') {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, APR_EINVAL, c,
H2_STRM_LOG(APLOGNO(03060), stream,
"pseudo header in trailer"));
return APR_EINVAL;
}
- if (h2_req_ignore_trailer(name, nlen)) {
+ if (h2_ignore_req_trailer(name, nlen)) {
return APR_SUCCESS;
}
- if (!stream->trailers) {
- stream->trailers = apr_table_make(stream->pool, 5);
+ if (!stream->trailers_in) {
+ stream->trailers_in = apr_table_make(stream->pool, 5);
}
hname = apr_pstrndup(stream->pool, name, nlen);
- hvalue = apr_pstrndup(stream->pool, value, vlen);
h2_util_camel_case_header(hname, nlen);
- apr_table_mergen(stream->trailers, hname, hvalue);
+ existing = apr_table_get(stream->trailers_in, hname);
+ if (max_field_len
+ && ((existing? strlen(existing)+2 : 0) + vlen + nlen + 2 > max_field_len)) {
+ /* "key: (oldval, )?nval" is too long */
+ return APR_EINVAL;
+ }
+ if (!existing) *pwas_added = 1;
+ hvalue = apr_pstrndup(stream->pool, value, vlen);
+ apr_table_mergen(stream->trailers_in, hname, hvalue);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
+ H2_STRM_MSG(stream, "added trailer '%s: %s'"), hname, hvalue);
return APR_SUCCESS;
}
@@ -693,274 +732,487 @@ apr_status_t h2_stream_add_header(h2_stream *stream,
const char *value, size_t vlen)
{
h2_session *session = stream->session;
- int error = 0;
- apr_status_t status;
+ int error = 0, was_added = 0;
+ apr_status_t status = APR_SUCCESS;
- if (stream->has_response) {
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
+ if (stream->response) {
return APR_EINVAL;
}
- ++stream->request_headers_added;
+
if (name[0] == ':') {
- if ((vlen) > session->s->limit_req_line) {
+ if (vlen > APR_INT32_MAX || (int)vlen > session->s->limit_req_line) {
/* pseudo header: approximation of request line size check */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
- H2_STRM_MSG(stream, "pseudo %s too long"), name);
+ if (!h2_stream_is_ready(stream)) {
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c1,
+ H2_STRM_LOG(APLOGNO(10178), stream,
+ "Request pseudo header exceeds "
+ "LimitRequestFieldSize: %s"), name);
+ }
error = HTTP_REQUEST_URI_TOO_LARGE;
+ goto cleanup;
}
}
- else if ((nlen + 2 + vlen) > session->s->limit_req_fieldsize) {
+
+ if (session->s->limit_req_fields > 0
+ && stream->request_headers_added > session->s->limit_req_fields) {
+ /* already over limit, count this attempt, but do not take it in */
+ ++stream->request_headers_added;
+ }
+ else if (H2_SS_IDLE == stream->state) {
+ if (!stream->rtmp) {
+ stream->rtmp = h2_request_create(stream->id, stream->pool,
+ NULL, NULL, NULL, NULL, NULL);
+ }
+ status = h2_request_add_header(stream->rtmp, stream->pool,
+ name, nlen, value, vlen,
+ session->s->limit_req_fieldsize, &was_added);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c1,
+ H2_STRM_MSG(stream, "add_header: '%.*s: %.*s"),
+ (int)nlen, name, (int)vlen, value);
+ if (was_added) ++stream->request_headers_added;
+ }
+ else if (H2_SS_OPEN == stream->state) {
+ status = add_trailer(stream, name, nlen, value, vlen,
+ session->s->limit_req_fieldsize, &was_added);
+ if (was_added) ++stream->request_headers_added;
+ }
+ else {
+ status = APR_EINVAL;
+ goto cleanup;
+ }
+
+ if (APR_EINVAL == status) {
/* header too long */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
- H2_STRM_MSG(stream, "header %s too long"), name);
+ if (!h2_stream_is_ready(stream)) {
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c1,
+ H2_STRM_LOG(APLOGNO(10180), stream,"Request header exceeds "
+ "LimitRequestFieldSize: %.*s"),
+ (int)H2MIN(nlen, 80), name);
+ }
error = HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
+ goto cleanup;
}
- if (stream->request_headers_added > session->s->limit_req_fields + 4) {
- /* too many header lines, include 4 pseudo headers */
- if (stream->request_headers_added
- > session->s->limit_req_fields + 4 + 100) {
- /* yeah, right */
+ if (session->s->limit_req_fields > 0
+ && stream->request_headers_added > session->s->limit_req_fields) {
+ /* too many header lines */
+ if (stream->request_headers_added > session->s->limit_req_fields + 100) {
+ /* yeah, right, this request is way over the limit, say goodbye */
h2_stream_rst(stream, H2_ERR_ENHANCE_YOUR_CALM);
return APR_ECONNRESET;
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
- H2_STRM_MSG(stream, "too many header lines"));
+ if (!h2_stream_is_ready(stream)) {
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c1,
+ H2_STRM_LOG(APLOGNO(10181), stream, "Number of request headers "
+ "exceeds LimitRequestFields"));
+ }
error = HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
+ goto cleanup;
}
+cleanup:
if (error) {
+ ++stream->request_headers_failed;
set_error_response(stream, error);
return APR_EINVAL;
}
- else if (H2_SS_IDLE == stream->state) {
- if (!stream->rtmp) {
- stream->rtmp = h2_req_create(stream->id, stream->pool,
- NULL, NULL, NULL, NULL, NULL, 0);
- }
- status = h2_request_add_header(stream->rtmp, stream->pool,
- name, nlen, value, vlen);
+ else if (status != APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c1,
+ H2_STRM_MSG(stream, "header %s not accepted"), name);
+ h2_stream_dispatch(stream, H2_SEV_CANCELLED);
}
- else if (H2_SS_OPEN == stream->state) {
- status = add_trailer(stream, name, nlen, value, vlen);
+ return status;
+}
+
+typedef struct {
+ apr_size_t maxlen;
+ const char *failed_key;
+} val_len_check_ctx;
+
+static int table_check_val_len(void *baton, const char *key, const char *value)
+{
+ val_len_check_ctx *ctx = baton;
+
+ if (strlen(value) <= ctx->maxlen) return 1;
+ ctx->failed_key = key;
+ return 0;
+}
+
+apr_status_t h2_stream_end_headers(h2_stream *stream, int eos, size_t raw_bytes)
+{
+ apr_status_t status;
+ val_len_check_ctx ctx;
+ int is_http_or_https;
+ h2_request *req = stream->rtmp;
+
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
+ status = h2_request_end_headers(req, stream->pool, raw_bytes);
+ if (APR_SUCCESS != status || req->http_status != H2_HTTP_STATUS_UNSET) {
+ goto cleanup;
+ }
+
+ /* keep on returning APR_SUCCESS for error responses, so that we
+ * send it and do not RST the stream.
+ */
+ set_policy_for(stream, req);
+
+ ctx.maxlen = stream->session->s->limit_req_fieldsize;
+ ctx.failed_key = NULL;
+ apr_table_do(table_check_val_len, &ctx, req->headers, NULL);
+ if (ctx.failed_key) {
+ if (!h2_stream_is_ready(stream)) {
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, stream->session->c1,
+ H2_STRM_LOG(APLOGNO(10230), stream,"Request header exceeds "
+ "LimitRequestFieldSize: %.*s"),
+ (int)H2MIN(strlen(ctx.failed_key), 80), ctx.failed_key);
+ }
+ set_error_response(stream, HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE);
+ goto cleanup;
+ }
+
+ /* http(s) scheme. rfc7540, ch. 8.1.2.3:
+ * This [:path] pseudo-header field MUST NOT be empty for "http" or "https"
+ * URIs; "http" or "https" URIs that do not contain a path component
+ * MUST include a value of '/'. The exception to this rule is an
+ * OPTIONS request for an "http" or "https" URI that does not include
+ * a path component; these MUST include a ":path" pseudo-header field
+ * with a value of '*'
+ *
+ * All HTTP/2 requests MUST include exactly one valid value for the
+ * ":method", ":scheme", and ":path" pseudo-header fields, unless it is
+ * a CONNECT request.
+ */
+ is_http_or_https = (!req->scheme
+ || !(ap_cstr_casecmpn(req->scheme, "http", 4) != 0
+ || (req->scheme[4] != '\0'
+ && (apr_tolower(req->scheme[4]) != 's'
+ || req->scheme[5] != '\0'))));
+
+ /* CONNECT. rfc7540, ch. 8.3:
+ * In HTTP/2, the CONNECT method is used to establish a tunnel over a
+ * single HTTP/2 stream to a remote host for similar purposes. The HTTP
+ * header field mapping works as defined in Section 8.1.2.3 ("Request
+ * Pseudo-Header Fields"), with a few differences. Specifically:
+ * o The ":method" pseudo-header field is set to "CONNECT".
+ * o The ":scheme" and ":path" pseudo-header fields MUST be omitted.
+ * o The ":authority" pseudo-header field contains the host and port to
+ * connect to (equivalent to the authority-form of the request-target
+ * of CONNECT requests (see [RFC7230], Section 5.3)).
+ */
+ if (!ap_cstr_casecmp(req->method, "CONNECT")) {
+ if (req->protocol) {
+ if (!strcmp("websocket", req->protocol)) {
+ if (!req->scheme || !req->path) {
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, stream->session->c1,
+ H2_STRM_LOG(APLOGNO(10457), stream, "Request to websocket CONNECT "
+ "without :scheme or :path, sending 400 answer"));
+ set_error_response(stream, HTTP_BAD_REQUEST);
+ goto cleanup;
+ }
+ }
+ else {
+ /* do not know that protocol */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c1, APLOGNO(10460)
+ "':protocol: %s' header present in %s request",
+ req->protocol, req->method);
+ set_error_response(stream, HTTP_NOT_IMPLEMENTED);
+ goto cleanup;
+ }
+ }
+ else if (req->scheme || req->path) {
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, stream->session->c1,
+ H2_STRM_LOG(APLOGNO(10384), stream, "Request to CONNECT "
+ "with :scheme or :path specified, sending 400 answer"));
+ set_error_response(stream, HTTP_BAD_REQUEST);
+ goto cleanup;
+ }
}
- else {
- status = APR_EINVAL;
+ else if (is_http_or_https) {
+ if (!req->path) {
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, stream->session->c1,
+ H2_STRM_LOG(APLOGNO(10385), stream, "Request for http(s) "
+ "resource without :path, sending 400 answer"));
+ set_error_response(stream, HTTP_BAD_REQUEST);
+ goto cleanup;
+ }
+ if (!req->scheme) {
+ req->scheme = ap_ssl_conn_is_ssl(stream->session->c1)? "https" : "http";
+ }
}
-
- if (status != APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
- H2_STRM_MSG(stream, "header %s not accepted"), name);
- h2_stream_dispatch(stream, H2_SEV_CANCELLED);
+
+ if (req->scheme && (req->path && req->path[0] != '/')) {
+ /* We still have a scheme, which means we need to pass an absolute URI into
+ * our HTTP protocol handling and the missing '/' at the start will prevent
+ * us from doing so (as it then confuses path and authority). */
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, stream->session->c1,
+ H2_STRM_LOG(APLOGNO(10379), stream, "Request :scheme '%s' and "
+ "path '%s' do not allow creating an absolute URL. Failing "
+ "request with 400."), req->scheme, req->path);
+ set_error_response(stream, HTTP_BAD_REQUEST);
+ goto cleanup;
+ }
+
+cleanup:
+ if (APR_SUCCESS == status) {
+ stream->request = req;
+ stream->rtmp = NULL;
+
+ if (APLOGctrace4(stream->session->c1)) {
+ int i;
+ const apr_array_header_t *t_h = apr_table_elts(req->headers);
+ const apr_table_entry_t *t_elt = (apr_table_entry_t *)t_h->elts;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, stream->session->c1,
+ H2_STRM_MSG(stream,"headers received from client:"));
+ for (i = 0; i < t_h->nelts; i++, t_elt++) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, stream->session->c1,
+ H2_STRM_MSG(stream, " %s: %s"),
+ ap_escape_logitem(stream->pool, t_elt->key),
+ ap_escape_logitem(stream->pool, t_elt->val));
+ }
+ }
}
return status;
}
-static apr_bucket *get_first_headers_bucket(apr_bucket_brigade *bb)
+static apr_bucket *get_first_response_bucket(apr_bucket_brigade *bb)
{
if (bb) {
apr_bucket *b = APR_BRIGADE_FIRST(bb);
while (b != APR_BRIGADE_SENTINEL(bb)) {
+#if AP_HAS_RESPONSE_BUCKETS
+ if (AP_BUCKET_IS_RESPONSE(b)) {
+ return b;
+ }
+#else
if (H2_BUCKET_IS_HEADERS(b)) {
return b;
}
+#endif
b = APR_BUCKET_NEXT(b);
}
}
return NULL;
}
-static apr_status_t add_buffered_data(h2_stream *stream, apr_off_t requested,
- apr_off_t *plen, int *peos, int *is_all,
- h2_headers **pheaders)
+static void stream_do_error_bucket(h2_stream *stream, apr_bucket *b)
{
- apr_bucket *b, *e;
-
- *peos = 0;
- *plen = 0;
- *is_all = 0;
- if (pheaders) {
- *pheaders = NULL;
+ int err = ((ap_bucket_error *)(b->data))->status;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c1,
+ H2_STRM_MSG(stream, "error bucket received, err=%d"), err);
+ if (err >= 500) {
+ err = NGHTTP2_INTERNAL_ERROR;
}
-
- H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "add_buffered_data");
- b = APR_BRIGADE_FIRST(stream->out_buffer);
- while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
- e = APR_BUCKET_NEXT(b);
- if (APR_BUCKET_IS_METADATA(b)) {
- if (APR_BUCKET_IS_FLUSH(b)) {
- APR_BUCKET_REMOVE(b);
- apr_bucket_destroy(b);
- }
- else if (APR_BUCKET_IS_EOS(b)) {
- *peos = 1;
- return APR_SUCCESS;
- }
- else if (H2_BUCKET_IS_HEADERS(b)) {
- if (*plen > 0) {
- /* data before the response, can only return up to here */
- return APR_SUCCESS;
- }
- else if (pheaders) {
- *pheaders = h2_bucket_headers_get(b);
- APR_BUCKET_REMOVE(b);
- apr_bucket_destroy(b);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
- H2_STRM_MSG(stream, "prep, -> response %d"),
- (*pheaders)->status);
- return APR_SUCCESS;
- }
- else {
- return APR_EAGAIN;
- }
- }
- }
- else if (b->length == 0) {
- APR_BUCKET_REMOVE(b);
- apr_bucket_destroy(b);
- }
- else {
- ap_assert(b->length != (apr_size_t)-1);
- *plen += b->length;
- if (*plen >= requested) {
- *plen = requested;
- return APR_SUCCESS;
- }
- }
- b = e;
+ else if (err >= 400) {
+ err = NGHTTP2_STREAM_CLOSED;
}
- *is_all = 1;
- return APR_SUCCESS;
+ else {
+ err = NGHTTP2_PROTOCOL_ERROR;
+ }
+ h2_stream_rst(stream, err);
}
-apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
- int *peos, h2_headers **pheaders)
+static apr_status_t buffer_output_receive(h2_stream *stream)
{
- apr_status_t status = APR_SUCCESS;
- apr_off_t requested, missing, max_chunk = H2_DATA_CHUNK_SIZE;
- conn_rec *c;
- int complete;
+ apr_status_t rv = APR_EAGAIN;
+ apr_off_t buf_len;
+ conn_rec *c1 = stream->session->c1;
+ apr_bucket *b, *e;
- ap_assert(stream);
-
+ if (!stream->output) {
+ goto cleanup;
+ }
if (stream->rst_error) {
- *plen = 0;
- *peos = 1;
- return APR_ECONNRESET;
+ rv = APR_ECONNRESET;
+ goto cleanup;
}
-
- c = stream->session->c;
- prep_output(stream);
- /* determine how much we'd like to send. We cannot send more than
- * is requested. But we can reduce the size in case the master
- * connection operates in smaller chunks. (TSL warmup) */
- if (stream->session->io.write_size > 0) {
- max_chunk = stream->session->io.write_size - 9; /* header bits */
+ if (!stream->out_buffer) {
+ stream->out_buffer = apr_brigade_create(stream->pool, c1->bucket_alloc);
+ buf_len = 0;
}
- requested = (*plen > 0)? H2MIN(*plen, max_chunk) : max_chunk;
-
- /* count the buffered data until eos or a headers bucket */
- status = add_buffered_data(stream, requested, plen, peos, &complete, pheaders);
-
- if (status == APR_EAGAIN) {
- /* TODO: ugly, someone needs to retrieve the response first */
- h2_mplx_keep_active(stream->session->mplx, stream);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
- H2_STRM_MSG(stream, "prep, response eagain"));
- return status;
+ else {
+ /* if the brigade contains a file bucket, its normal report length
+ * might be megabytes, but the memory used is tiny. For buffering,
+ * we are only interested in the memory footprint. */
+ buf_len = h2_brigade_mem_size(stream->out_buffer);
}
- else if (status != APR_SUCCESS) {
- return status;
+
+ if (buf_len > APR_INT32_MAX
+ || (apr_size_t)buf_len >= stream->session->max_stream_mem) {
+ /* we have buffered enough. No need to read more.
+ * However, we have now output pending for which we may not
+ * receive another poll event. We need to make sure that this
+ * stream is not suspended so we keep on processing output.
+ */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c1,
+ H2_STRM_MSG(stream, "out_buffer, already has %ld length"),
+ (long)buf_len);
+ rv = APR_SUCCESS;
+ goto cleanup;
}
-
- if (pheaders && *pheaders) {
- return APR_SUCCESS;
+
+ if (stream->output_eos) {
+ rv = APR_BRIGADE_EMPTY(stream->out_buffer)? APR_EOF : APR_SUCCESS;
}
-
- /* If there we do not have enough buffered data to satisfy the requested
- * length *and* we counted the _complete_ buffer (and did not stop in the middle
- * because of meta data there), lets see if we can read more from the
- * output beam */
- missing = H2MIN(requested, stream->max_mem) - *plen;
- if (complete && !*peos && missing > 0) {
- apr_status_t rv = APR_EOF;
-
- if (stream->output) {
- H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre");
- rv = h2_beam_receive(stream->output, stream->out_buffer,
- APR_NONBLOCK_READ, stream->max_mem - *plen);
- H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "post");
- }
-
- if (rv == APR_SUCCESS) {
- /* count the buffer again, now that we have read output */
- status = add_buffered_data(stream, requested, plen, peos, &complete, pheaders);
- }
- else if (APR_STATUS_IS_EOF(rv)) {
- apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
- APR_BRIGADE_INSERT_TAIL(stream->out_buffer, eos);
- *peos = 1;
- }
- else if (APR_STATUS_IS_EAGAIN(rv)) {
- /* we set this is the status of this call only if there
- * is no buffered data, see check below */
- }
- else {
- /* real error reading. Give this back directly, even though
- * we may have something buffered. */
- status = rv;
+ else {
+ H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre");
+ rv = h2_beam_receive(stream->output, stream->session->c1, stream->out_buffer,
+ APR_NONBLOCK_READ, stream->session->max_stream_mem - buf_len);
+ if (APR_SUCCESS != rv) {
+ if (APR_EAGAIN != rv) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c1,
+ H2_STRM_MSG(stream, "out_buffer, receive unsuccessful"));
+ }
}
}
-
- if (status == APR_SUCCESS) {
- if (*peos || *plen) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
- H2_STRM_MSG(stream, "prepare, len=%ld eos=%d"),
- (long)*plen, *peos);
- }
- else {
- status = (stream->output && h2_beam_is_closed(stream->output))? APR_EOF : APR_EAGAIN;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
- H2_STRM_MSG(stream, "prepare, no data"));
+
+ /* get rid of buckets we have no need for */
+ if (!APR_BRIGADE_EMPTY(stream->out_buffer)) {
+ b = APR_BRIGADE_FIRST(stream->out_buffer);
+ while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
+ e = APR_BUCKET_NEXT(b);
+ if (APR_BUCKET_IS_METADATA(b)) {
+ if (APR_BUCKET_IS_FLUSH(b)) { /* we flush any c1 data already */
+ APR_BUCKET_REMOVE(b);
+ apr_bucket_destroy(b);
+ }
+ else if (APR_BUCKET_IS_EOS(b)) {
+ stream->output_eos = 1;
+ }
+ else if (AP_BUCKET_IS_ERROR(b)) {
+ stream_do_error_bucket(stream, b);
+ break;
+ }
+ }
+ else if (b->length == 0) { /* zero length data */
+ APR_BUCKET_REMOVE(b);
+ apr_bucket_destroy(b);
+ }
+ b = e;
}
}
- return status;
+ H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "out_buffer, after receive");
+
+cleanup:
+ return rv;
}
-static int is_not_headers(apr_bucket *b)
+static int bucket_pass_to_c1(apr_bucket *b)
{
- return !H2_BUCKET_IS_HEADERS(b);
+#if AP_HAS_RESPONSE_BUCKETS
+ return !AP_BUCKET_IS_RESPONSE(b)
+ && !AP_BUCKET_IS_HEADERS(b)
+ && !APR_BUCKET_IS_EOS(b);
+#else
+ return !H2_BUCKET_IS_HEADERS(b) && !APR_BUCKET_IS_EOS(b);
+#endif
}
apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
apr_off_t *plen, int *peos)
{
- conn_rec *c = stream->session->c;
- apr_status_t status = APR_SUCCESS;
+ apr_status_t rv = APR_SUCCESS;
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
if (stream->rst_error) {
return APR_ECONNRESET;
}
- status = h2_append_brigade(bb, stream->out_buffer, plen, peos, is_not_headers);
- if (status == APR_SUCCESS && !*peos && !*plen) {
- status = APR_EAGAIN;
+ rv = h2_append_brigade(bb, stream->out_buffer, plen, peos, bucket_pass_to_c1);
+ if (APR_SUCCESS == rv && !*peos && !*plen) {
+ rv = APR_EAGAIN;
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
- H2_STRM_MSG(stream, "read_to, len=%ld eos=%d"),
- (long)*plen, *peos);
- return status;
+ return rv;
}
+static apr_status_t stream_do_trailers(h2_stream *stream)
+{
+ conn_rec *c1 = stream->session->c1;
+ int ngrv;
+ h2_ngheader *nh = NULL;
+ apr_bucket *b, *e;
+#if AP_HAS_RESPONSE_BUCKETS
+ ap_bucket_headers *headers = NULL;
+#else
+ h2_headers *headers = NULL;
+#endif
+ apr_status_t rv;
+
+ ap_assert(stream->response);
+ ap_assert(stream->out_buffer);
+
+ b = APR_BRIGADE_FIRST(stream->out_buffer);
+ while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
+ e = APR_BUCKET_NEXT(b);
+ if (APR_BUCKET_IS_METADATA(b)) {
+#if AP_HAS_RESPONSE_BUCKETS
+ if (AP_BUCKET_IS_HEADERS(b)) {
+ headers = b->data;
+#else /* AP_HAS_RESPONSE_BUCKETS */
+ if (H2_BUCKET_IS_HEADERS(b)) {
+ headers = h2_bucket_headers_get(b);
+#endif /* else AP_HAS_RESPONSE_BUCKETS */
+ APR_BUCKET_REMOVE(b);
+ apr_bucket_destroy(b);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c1,
+ H2_STRM_MSG(stream, "process trailers"));
+ break;
+ }
+ else if (APR_BUCKET_IS_EOS(b)) {
+ break;
+ }
+ }
+ else {
+ break;
+ }
+ b = e;
+ }
+ if (!headers) {
+ rv = APR_EAGAIN;
+ goto cleanup;
+ }
+
+ rv = h2_res_create_ngtrailer(&nh, stream->pool, headers);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, c1,
+ H2_STRM_LOG(APLOGNO(03072), stream, "submit %d trailers"),
+ (int)nh->nvlen);
+ if (APR_SUCCESS != rv) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, c1,
+ H2_STRM_LOG(APLOGNO(10024), stream, "invalid trailers"));
+ h2_stream_rst(stream, NGHTTP2_PROTOCOL_ERROR);
+ goto cleanup;
+ }
+
+ ngrv = nghttp2_submit_trailer(stream->session->ngh2, stream->id, nh->nv, nh->nvlen);
+ if (nghttp2_is_fatal(ngrv)) {
+ rv = APR_EGENERAL;
+ h2_session_dispatch_event(stream->session,
+ H2_SESSION_EV_PROTO_ERROR, ngrv, nghttp2_strerror(rv));
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, c1,
+ APLOGNO(02940) "submit_response: %s",
+ nghttp2_strerror(rv));
+ }
+ stream->sent_trailers = 1;
+
+cleanup:
+ return rv;
+}
+
+#if AP_HAS_RESPONSE_BUCKETS
+apr_status_t h2_stream_submit_pushes(h2_stream *stream, ap_bucket_response *response)
+#else
apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response)
+#endif
{
apr_status_t status = APR_SUCCESS;
apr_array_header_t *pushes;
int i;
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
pushes = h2_push_collect_update(stream, stream->request, response);
if (pushes && !apr_is_empty_array(pushes)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c1,
H2_STRM_MSG(stream, "found %d push candidates"),
pushes->nelts);
for (i = 0; i < pushes->nelts; ++i) {
@@ -977,17 +1229,24 @@ apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response)
apr_table_t *h2_stream_get_trailers(h2_stream *stream)
{
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
return NULL;
}
-const h2_priority *h2_stream_get_priority(h2_stream *stream,
+#if AP_HAS_RESPONSE_BUCKETS
+const h2_priority *h2_stream_get_priority(h2_stream *stream,
+ ap_bucket_response *response)
+#else
+const h2_priority *h2_stream_get_priority(h2_stream *stream,
h2_headers *response)
+#endif
{
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
if (response && stream->initiated_on) {
const char *ctype = apr_table_get(response->headers, "content-type");
if (ctype) {
/* FIXME: Not good enough, config needs to come from request->server */
- return h2_config_get_priority(stream->session->config, ctype);
+ return h2_cconfig_get_priority(stream->session->c1, ctype);
}
}
return NULL;
@@ -995,21 +1254,47 @@ const h2_priority *h2_stream_get_priority(h2_stream *stream,
int h2_stream_is_ready(h2_stream *stream)
{
- if (stream->has_response) {
+ /* Have we sent a response or do we have the response in our buffer? */
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
+ if (stream->response) {
return 1;
}
- else if (stream->out_buffer && get_first_headers_bucket(stream->out_buffer)) {
+ else if (stream->out_buffer && get_first_response_bucket(stream->out_buffer)) {
return 1;
}
return 0;
}
-int h2_stream_was_closed(const h2_stream *stream)
+int h2_stream_wants_send_data(h2_stream *stream)
{
- switch (stream->state) {
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
+ return h2_stream_is_ready(stream) &&
+ ((stream->out_buffer && !APR_BRIGADE_EMPTY(stream->out_buffer)) ||
+ (stream->output && !h2_beam_empty(stream->output)));
+}
+
+int h2_stream_is_at(const h2_stream *stream, h2_stream_state_t state)
+{
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
+ return stream->state == state;
+}
+
+int h2_stream_is_at_or_past(const h2_stream *stream, h2_stream_state_t state)
+{
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
+ switch (state) {
+ case H2_SS_IDLE:
+ return 1; /* by definition */
+ case H2_SS_RSVD_R: /*fall through*/
+ case H2_SS_RSVD_L: /*fall through*/
+ case H2_SS_OPEN:
+ return stream->state == state || stream->state >= H2_SS_OPEN;
+ case H2_SS_CLOSED_R: /*fall through*/
+ case H2_SS_CLOSED_L: /*fall through*/
case H2_SS_CLOSED:
+ return stream->state == state || stream->state >= H2_SS_CLOSED;
case H2_SS_CLEANUP:
- return 1;
+ return stream->state == state;
default:
return 0;
}
@@ -1019,6 +1304,7 @@ apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount)
{
h2_session *session = stream->session;
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
if (amount > 0) {
apr_off_t consumed = amount;
@@ -1066,13 +1352,477 @@ apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount)
nghttp2_session_set_local_window_size(session->ngh2,
NGHTTP2_FLAG_NONE, stream->id, win);
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_stream(%ld-%d): consumed %ld bytes, window now %d/%d",
- session->id, stream->id, (long)amount,
- cur_size, stream->in_window_size);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c1,
+ H2_STRM_MSG(stream, "consumed %ld bytes, window now %d/%d"),
+ (long)amount, cur_size, stream->in_window_size);
}
-#endif
+#endif /* #ifdef H2_NG2_LOCAL_WIN_SIZE */
}
return APR_SUCCESS;
}
+static apr_off_t output_data_buffered(h2_stream *stream, int *peos, int *pheader_blocked)
+{
+ /* How much data do we have in our buffers that we can write? */
+ apr_off_t buf_len = 0;
+ apr_bucket *b;
+
+ *peos = *pheader_blocked = 0;
+ if (stream->out_buffer) {
+ b = APR_BRIGADE_FIRST(stream->out_buffer);
+ while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
+ if (APR_BUCKET_IS_METADATA(b)) {
+ if (APR_BUCKET_IS_EOS(b)) {
+ *peos = 1;
+ break;
+ }
+#if AP_HAS_RESPONSE_BUCKETS
+ else if (AP_BUCKET_IS_RESPONSE(b)) {
+ break;
+ }
+ else if (AP_BUCKET_IS_HEADERS(b)) {
+ *pheader_blocked = 1;
+ break;
+ }
+#else
+ else if (H2_BUCKET_IS_HEADERS(b)) {
+ *pheader_blocked = 1;
+ break;
+ }
+#endif
+ }
+ else {
+ buf_len += b->length;
+ }
+ b = APR_BUCKET_NEXT(b);
+ }
+ }
+ return buf_len;
+}
+
+static ssize_t stream_data_cb(nghttp2_session *ng2s,
+ int32_t stream_id,
+ uint8_t *buf,
+ size_t length,
+ uint32_t *data_flags,
+ nghttp2_data_source *source,
+ void *puser)
+{
+ h2_session *session = (h2_session *)puser;
+ conn_rec *c1 = session->c1;
+ apr_off_t buf_len;
+ int eos, header_blocked;
+ apr_status_t rv;
+ h2_stream *stream;
+
+ /* nghttp2 wants to send more DATA for the stream.
+ * we should have submitted the final response at this time
+ * after receiving output via stream_do_responses() */
+ ap_assert(session);
+ (void)ng2s;
+ (void)buf;
+ (void)source;
+ stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
+
+ if (!stream) {
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, c1,
+ APLOGNO(02937)
+ H2_SSSN_STRM_MSG(session, stream_id, "data_cb, stream not found"));
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+ }
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
+ if (!stream->output || !stream->response || !stream->out_buffer) {
+ return NGHTTP2_ERR_DEFERRED;
+ }
+ if (stream->rst_error) {
+ return NGHTTP2_ERR_DEFERRED;
+ }
+ if (h2_c1_io_needs_flush(&session->io)) {
+ rv = h2_c1_io_pass(&session->io);
+ if (APR_STATUS_IS_EAGAIN(rv)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c1,
+ H2_SSSN_STRM_MSG(session, stream_id, "suspending on c1 out needs flush"));
+ h2_stream_dispatch(stream, H2_SEV_OUT_C1_BLOCK);
+ return NGHTTP2_ERR_DEFERRED;
+ }
+ else if (rv) {
+ h2_session_dispatch_event(session, H2_SESSION_EV_CONN_ERROR, rv, NULL);
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+ }
+ }
+
+ /* determine how much we'd like to send. We cannot send more than
+ * is requested. But we can reduce the size in case the master
+ * connection operates in smaller chunks. (TSL warmup) */
+ if (stream->session->io.write_size > 0) {
+ apr_size_t chunk_len = stream->session->io.write_size - H2_FRAME_HDR_LEN;
+ if (length > chunk_len) {
+ length = chunk_len;
+ }
+ }
+ /* We allow configurable max DATA frame length. */
+ if (stream->session->max_data_frame_len > 0
+ && length > stream->session->max_data_frame_len) {
+ length = stream->session->max_data_frame_len;
+ }
+
+ /* How much data do we have in our buffers that we can write?
+ * if not enough, receive more. */
+ buf_len = output_data_buffered(stream, &eos, &header_blocked);
+ if (buf_len < (apr_off_t)length && !eos
+ && !header_blocked && !stream->rst_error) {
+ /* read more? */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c1,
+ H2_SSSN_STRM_MSG(session, stream_id,
+ "need more (read len=%ld, %ld in buffer)"),
+ (long)length, (long)buf_len);
+ rv = buffer_output_receive(stream);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c1,
+ H2_SSSN_STRM_MSG(session, stream_id,
+ "buffer_output_received"));
+ if (APR_STATUS_IS_EAGAIN(rv)) {
+ /* currently, no more is available */
+ }
+ else if (APR_SUCCESS == rv) {
+ /* got some, re-assess */
+ buf_len = output_data_buffered(stream, &eos, &header_blocked);
+ }
+ else if (APR_EOF == rv) {
+ if (!stream->output_eos) {
+ /* Seeing APR_EOF without an EOS bucket received before indicates
+ * that stream output is incomplete. Commonly, we expect to see
+ * an ERROR bucket to have been generated. But faulty handlers
+ * may not have generated one.
+ * We need to RST the stream bc otherwise the client thinks
+ * it is all fine. */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c1,
+ H2_SSSN_STRM_MSG(session, stream_id, "rst stream"));
+ h2_stream_rst(stream, H2_ERR_STREAM_CLOSED);
+ return NGHTTP2_ERR_DEFERRED;
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c1,
+ H2_SSSN_STRM_MSG(session, stream_id,
+ "eof on receive (read len=%ld, %ld in buffer)"),
+ (long)length, (long)buf_len);
+ eos = 1;
+ rv = APR_SUCCESS;
+ }
+ else if (APR_ECONNRESET == rv || APR_ECONNABORTED == rv) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, c1,
+ H2_STRM_LOG(APLOGNO(10471), stream, "data_cb, reading data"));
+ h2_stream_rst(stream, H2_ERR_STREAM_CLOSED);
+ return NGHTTP2_ERR_DEFERRED;
+ }
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, c1,
+ H2_STRM_LOG(APLOGNO(02938), stream, "data_cb, reading data"));
+ h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
+ return NGHTTP2_ERR_DEFERRED;
+ }
+ }
+
+ if (stream->rst_error) {
+ return NGHTTP2_ERR_DEFERRED;
+ }
+
+ if (buf_len == 0 && header_blocked) {
+ rv = stream_do_trailers(stream);
+ if (APR_SUCCESS != rv && !APR_STATUS_IS_EAGAIN(rv)) {
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, c1,
+ H2_STRM_LOG(APLOGNO(10300), stream,
+ "data_cb, error processing trailers"));
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+ }
+ length = 0;
+ eos = 0;
+ }
+ else if (buf_len > (apr_off_t)length) {
+ eos = 0; /* Any EOS we have in the buffer does not apply yet */
+ }
+ else {
+ length = (size_t)buf_len;
+ }
+
+ if (length) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c1,
+ H2_STRM_MSG(stream, "data_cb, sending len=%ld, eos=%d"),
+ (long)length, eos);
+ *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
+ }
+ else if (!eos && !stream->sent_trailers) {
+ /* We have not reached the end of DATA yet, DEFER sending */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c1,
+ H2_STRM_LOG(APLOGNO(03071), stream, "data_cb, suspending"));
+ return NGHTTP2_ERR_DEFERRED;
+ }
+
+ if (eos) {
+ *data_flags |= NGHTTP2_DATA_FLAG_EOF;
+ }
+ return length;
+}
+
+static apr_status_t stream_do_response(h2_stream *stream)
+{
+ conn_rec *c1 = stream->session->c1;
+ apr_status_t rv = APR_EAGAIN;
+ int ngrv, is_empty = 0;
+ h2_ngheader *nh = NULL;
+ apr_bucket *b, *e;
+#if AP_HAS_RESPONSE_BUCKETS
+ ap_bucket_response *resp = NULL;
+#else
+ h2_headers *resp = NULL;
+#endif
+ nghttp2_data_provider provider, *pprovider = NULL;
+
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
+ ap_assert(!stream->response);
+ ap_assert(stream->out_buffer);
+
+ b = APR_BRIGADE_FIRST(stream->out_buffer);
+ while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
+ e = APR_BUCKET_NEXT(b);
+ if (APR_BUCKET_IS_METADATA(b)) {
+#if AP_HAS_RESPONSE_BUCKETS
+ if (AP_BUCKET_IS_RESPONSE(b)) {
+ resp = b->data;
+#else /* AP_HAS_RESPONSE_BUCKETS */
+ if (H2_BUCKET_IS_HEADERS(b)) {
+ resp = h2_bucket_headers_get(b);
+#endif /* else AP_HAS_RESPONSE_BUCKETS */
+ APR_BUCKET_REMOVE(b);
+ apr_bucket_destroy(b);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c1,
+ H2_STRM_MSG(stream, "process response %d"),
+ resp->status);
+ is_empty = (e != APR_BRIGADE_SENTINEL(stream->out_buffer)
+ && APR_BUCKET_IS_EOS(e));
+ break;
+ }
+ else if (APR_BUCKET_IS_EOS(b)) {
+ h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
+ rv = APR_EINVAL;
+ goto cleanup;
+ }
+ else if (AP_BUCKET_IS_ERROR(b)) {
+ stream_do_error_bucket(stream, b);
+ rv = APR_EINVAL;
+ goto cleanup;
+ }
+ }
+ else {
+ /* data buckets before response headers, an error */
+ h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
+ rv = APR_EINVAL;
+ goto cleanup;
+ }
+ b = e;
+ }
+
+ if (!resp) {
+ rv = APR_EAGAIN;
+ goto cleanup;
+ }
+
+ if (resp->status < 100) {
+ h2_stream_rst(stream, resp->status);
+ goto cleanup;
+ }
+
+ if (resp->status == HTTP_FORBIDDEN && resp->notes) {
+ const char *cause = apr_table_get(resp->notes, "ssl-renegotiate-forbidden");
+ if (cause) {
+ /* This request triggered a TLS renegotiation that is not allowed
+ * in HTTP/2. Tell the client that it should use HTTP/1.1 for this.
+ */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, resp->status, c1,
+ H2_STRM_LOG(APLOGNO(03061), stream,
+ "renegotiate forbidden, cause: %s"), cause);
+ h2_stream_rst(stream, H2_ERR_HTTP_1_1_REQUIRED);
+ goto cleanup;
+ }
+ }
+
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c1,
+ H2_STRM_LOG(APLOGNO(03073), stream,
+ "submit response %d"), resp->status);
+
+ /* If this stream is not a pushed one itself,
+ * and HTTP/2 server push is enabled here,
+ * and the response HTTP status is not sth >= 400,
+ * and the remote side has pushing enabled,
+ * -> find and perform any pushes on this stream
+ * *before* we submit the stream response itself.
+ * This helps clients avoid opening new streams on Link
+ * resp that get pushed right afterwards.
+ *
+ * *) the response code is relevant, as we do not want to
+ * make pushes on 401 or 403 codes and friends.
+ * And if we see a 304, we do not push either
+ * as the client, having this resource in its cache, might
+ * also have the pushed ones as well.
+ */
+ if (!stream->initiated_on
+ && !stream->response
+ && stream->request && stream->request->method
+ && !strcmp("GET", stream->request->method)
+ && (resp->status < 400)
+ && (resp->status != 304)
+ && h2_session_push_enabled(stream->session)) {
+ /* PUSH is possible and enabled on server, unless the request
+ * denies it, submit resources to push */
+ const char *s = apr_table_get(resp->notes, H2_PUSH_MODE_NOTE);
+ if (!s || strcmp(s, "0")) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c1,
+ H2_STRM_MSG(stream, "submit pushes, note=%s"), s);
+ h2_stream_submit_pushes(stream, resp);
+ }
+ }
+
+ if (!stream->pref_priority) {
+ stream->pref_priority = h2_stream_get_priority(stream, resp);
+ }
+ h2_session_set_prio(stream->session, stream, stream->pref_priority);
+
+ if (resp->status == 103
+ && !h2_config_sgeti(stream->session->s, H2_CONF_EARLY_HINTS)) {
+ /* suppress sending this to the client, it might have triggered
+ * pushes and served its purpose nevertheless */
+ rv = APR_SUCCESS;
+ goto cleanup;
+ }
+ if (resp->status >= 200) {
+ stream->response = resp;
+ }
+
+ if (!is_empty) {
+ memset(&provider, 0, sizeof(provider));
+ provider.source.fd = stream->id;
+ provider.read_callback = stream_data_cb;
+ pprovider = &provider;
+ }
+
+ rv = h2_res_create_ngheader(&nh, stream->pool, resp);
+ if (APR_SUCCESS != rv) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, c1,
+ H2_STRM_LOG(APLOGNO(10025), stream, "invalid response"));
+ h2_stream_rst(stream, NGHTTP2_PROTOCOL_ERROR);
+ goto cleanup;
+ }
+
+ ngrv = nghttp2_submit_response(stream->session->ngh2, stream->id,
+ nh->nv, nh->nvlen, pprovider);
+ if (nghttp2_is_fatal(ngrv)) {
+ rv = APR_EGENERAL;
+ h2_session_dispatch_event(stream->session,
+ H2_SESSION_EV_PROTO_ERROR, ngrv, nghttp2_strerror(rv));
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, c1,
+ APLOGNO(10402) "submit_response: %s",
+ nghttp2_strerror(rv));
+ goto cleanup;
+ }
+
+ if (stream->initiated_on) {
+ ++stream->session->pushes_submitted;
+ }
+ else {
+ ++stream->session->responses_submitted;
+ }
+
+cleanup:
+ return rv;
+}
+
+static void stream_do_responses(h2_stream *stream)
+{
+ h2_session *session = stream->session;
+ conn_rec *c1 = session->c1;
+ apr_status_t rv;
+
+ ap_assert(!stream->response);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c1,
+ H2_STRM_MSG(stream, "do_response"));
+ rv = buffer_output_receive(stream);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c1,
+ H2_SSSN_STRM_MSG(session, stream->id,
+ "buffer_output_received2"));
+ if (APR_SUCCESS != rv && APR_EAGAIN != rv) {
+ h2_stream_rst(stream, NGHTTP2_PROTOCOL_ERROR);
+ }
+ else {
+ /* process all headers sitting at the buffer head. */
+ do {
+ rv = stream_do_response(stream);
+ } while (APR_SUCCESS == rv
+ && !stream->rst_error
+ && !stream->response);
+ }
+}
+
+void h2_stream_on_output_change(h2_stream *stream)
+{
+ conn_rec *c1 = stream->session->c1;
+ apr_status_t rv = APR_EAGAIN;
+
+ /* stream->pout_recv_write signalled a change. Check what has happend, read
+ * from it and act on seeing a response/data. */
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
+ if (!stream->output) {
+ /* c2 has not assigned the output beam to the stream (yet). */
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c1,
+ H2_STRM_MSG(stream, "read_output, no output beam registered"));
+ }
+ else if (h2_stream_is_at_or_past(stream, H2_SS_CLOSED)) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, c1,
+ H2_STRM_LOG(APLOGNO(10301), stream, "already closed"));
+ }
+ else if (h2_stream_is_at(stream, H2_SS_CLOSED_L)) {
+ /* We have delivered a response to a stream that was not closed
+ * by the client. This could be a POST with body that we negate
+ * and we need to RST_STREAM to end if. */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c1,
+ H2_STRM_LOG(APLOGNO(10313), stream, "remote close missing"));
+ h2_stream_rst(stream, H2_ERR_NO_ERROR);
+ }
+ else {
+ /* stream is not closed, a change in output happened. There are
+ * two modes of operation here:
+ * 1) the final response has been submitted. nghttp2 is invoking
+ * stream_data_cb() to progress the stream. This handles DATA,
+ * trailers, EOS and ERRORs.
+ * When stream_data_cb() runs out of things to send, it returns
+ * NGHTTP2_ERR_DEFERRED and nghttp2 *suspends* further processing
+ * until we tell it to resume.
+ * 2) We have not seen the *final* response yet. The stream can not
+ * send any response DATA. The nghttp2 stream_data_cb() is not
+ * invoked. We need to receive output, expecting not DATA but
+ * RESPONSEs (intermediate may arrive) and submit those. On
+ * the final response, nghttp2 will start calling stream_data_cb().
+ */
+ if (stream->response) {
+ nghttp2_session_resume_data(stream->session->ngh2, stream->id);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c1,
+ H2_STRM_MSG(stream, "resumed"));
+ }
+ else {
+ stream_do_responses(stream);
+ if (!stream->rst_error) {
+ nghttp2_session_resume_data(stream->session->ngh2, stream->id);
+ }
+ }
+ }
+}
+
+void h2_stream_on_input_change(h2_stream *stream)
+{
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
+ ap_assert(stream->input);
+ h2_beam_report_consumption(stream->input);
+ if (h2_stream_is_at(stream, H2_SS_CLOSED_L)
+ && !h2_mplx_c1_stream_is_running(stream->session->mplx, stream)) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c1,
+ H2_STRM_LOG(APLOGNO(10026), stream, "remote close missing"));
+ h2_stream_rst(stream, H2_ERR_NO_ERROR);
+ }
+}