diff options
Diffstat (limited to 'modules/http2/h2_bucket_beam.c')
-rw-r--r-- | modules/http2/h2_bucket_beam.c | 1470 |
1 files changed, 535 insertions, 935 deletions
diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c index f79cbe3..6978254 100644 --- a/modules/http2/h2_bucket_beam.c +++ b/modules/http2/h2_bucket_beam.c @@ -24,261 +24,123 @@ #include <httpd.h> #include <http_protocol.h> +#include <http_request.h> #include <http_log.h> #include "h2_private.h" +#include "h2_conn_ctx.h" +#include "h2_headers.h" #include "h2_util.h" #include "h2_bucket_beam.h" -static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy); -#define H2_BPROXY_NEXT(e) APR_RING_NEXT((e), link) -#define H2_BPROXY_PREV(e) APR_RING_PREV((e), link) -#define H2_BPROXY_REMOVE(e) APR_RING_REMOVE((e), link) - -#define H2_BPROXY_LIST_INIT(b) APR_RING_INIT(&(b)->list, h2_beam_proxy, link); -#define H2_BPROXY_LIST_SENTINEL(b) APR_RING_SENTINEL(&(b)->list, h2_beam_proxy, link) -#define H2_BPROXY_LIST_EMPTY(b) APR_RING_EMPTY(&(b)->list, h2_beam_proxy, link) -#define H2_BPROXY_LIST_FIRST(b) APR_RING_FIRST(&(b)->list) -#define H2_BPROXY_LIST_LAST(b) APR_RING_LAST(&(b)->list) -#define H2_PROXY_BLIST_INSERT_HEAD(b, e) do { \ - h2_beam_proxy *ap__b = (e); \ - APR_RING_INSERT_HEAD(&(b)->list, ap__b, h2_beam_proxy, link); \ +#define H2_BLIST_INIT(b) APR_RING_INIT(&(b)->list, apr_bucket, link); +#define H2_BLIST_SENTINEL(b) APR_RING_SENTINEL(&(b)->list, apr_bucket, link) +#define H2_BLIST_EMPTY(b) APR_RING_EMPTY(&(b)->list, apr_bucket, link) +#define H2_BLIST_FIRST(b) APR_RING_FIRST(&(b)->list) +#define H2_BLIST_LAST(b) APR_RING_LAST(&(b)->list) +#define H2_BLIST_INSERT_HEAD(b, e) do { \ + apr_bucket *ap__b = (e); \ + APR_RING_INSERT_HEAD(&(b)->list, ap__b, apr_bucket, link); \ } while (0) -#define H2_BPROXY_LIST_INSERT_TAIL(b, e) do { \ - h2_beam_proxy *ap__b = (e); \ - APR_RING_INSERT_TAIL(&(b)->list, ap__b, h2_beam_proxy, link); \ +#define H2_BLIST_INSERT_TAIL(b, e) do { \ + apr_bucket *ap__b = (e); \ + APR_RING_INSERT_TAIL(&(b)->list, ap__b, apr_bucket, link); \ } while (0) -#define H2_BPROXY_LIST_CONCAT(a, b) do { \ - APR_RING_CONCAT(&(a)->list, &(b)->list, h2_beam_proxy, link); \ +#define H2_BLIST_CONCAT(a, b) do { \ + APR_RING_CONCAT(&(a)->list, &(b)->list, apr_bucket, link); \ } while (0) -#define H2_BPROXY_LIST_PREPEND(a, b) do { \ - APR_RING_PREPEND(&(a)->list, &(b)->list, h2_beam_proxy, link); \ +#define H2_BLIST_PREPEND(a, b) do { \ + APR_RING_PREPEND(&(a)->list, &(b)->list, apr_bucket, link); \ } while (0) -/******************************************************************************* - * beam bucket with reference to beam and bucket it represents - ******************************************************************************/ - -const apr_bucket_type_t h2_bucket_type_beam; - -#define H2_BUCKET_IS_BEAM(e) (e->type == &h2_bucket_type_beam) - -struct h2_beam_proxy { - apr_bucket_refcount refcount; - APR_RING_ENTRY(h2_beam_proxy) link; - h2_bucket_beam *beam; - apr_bucket *bsender; - apr_size_t n; -}; - -static const char Dummy = '\0'; - -static apr_status_t beam_bucket_read(apr_bucket *b, const char **str, - apr_size_t *len, apr_read_type_e block) -{ - h2_beam_proxy *d = b->data; - if (d->bsender) { - const char *data; - apr_status_t status = apr_bucket_read(d->bsender, &data, len, block); - if (status == APR_SUCCESS) { - *str = data + b->start; - *len = b->length; - } - return status; - } - *str = &Dummy; - *len = 0; - return APR_ECONNRESET; -} - -static void beam_bucket_destroy(void *data) -{ - h2_beam_proxy *d = data; - - if (apr_bucket_shared_destroy(d)) { - /* When the beam gets destroyed before this bucket, it will - * NULLify its reference here. This is not protected by a mutex, - * so it will not help with race conditions. - * But it lets us shut down memory pool with circulare beam - * references. */ - if (d->beam) { - h2_beam_emitted(d->beam, d); - } - apr_bucket_free(d); - } -} - -static apr_bucket * h2_beam_bucket_make(apr_bucket *b, - h2_bucket_beam *beam, - apr_bucket *bsender, apr_size_t n) -{ - h2_beam_proxy *d; - - d = apr_bucket_alloc(sizeof(*d), b->list); - H2_BPROXY_LIST_INSERT_TAIL(&beam->proxies, d); - d->beam = beam; - d->bsender = bsender; - d->n = n; - - b = apr_bucket_shared_make(b, d, 0, bsender? bsender->length : 0); - b->type = &h2_bucket_type_beam; - - return b; -} - -static apr_bucket *h2_beam_bucket_create(h2_bucket_beam *beam, - apr_bucket *bsender, - apr_bucket_alloc_t *list, - apr_size_t n) -{ - apr_bucket *b = apr_bucket_alloc(sizeof(*b), list); - - APR_BUCKET_INIT(b); - b->free = apr_bucket_free; - b->list = list; - return h2_beam_bucket_make(b, beam, bsender, n); -} - -const apr_bucket_type_t h2_bucket_type_beam = { - "BEAM", 5, APR_BUCKET_DATA, - beam_bucket_destroy, - beam_bucket_read, - apr_bucket_setaside_noop, - apr_bucket_shared_split, - apr_bucket_shared_copy -}; - -/******************************************************************************* - * h2_blist, a brigade without allocations - ******************************************************************************/ - -static apr_array_header_t *beamers; +static int buffer_is_empty(h2_bucket_beam *beam); +static apr_off_t get_buffered_data_len(h2_bucket_beam *beam); -static apr_status_t cleanup_beamers(void *dummy) +static int h2_blist_count(h2_blist *blist) { - (void)dummy; - beamers = NULL; - return APR_SUCCESS; -} - -void h2_register_bucket_beamer(h2_bucket_beamer *beamer) -{ - if (!beamers) { - apr_pool_cleanup_register(apr_hook_global_pool, NULL, - cleanup_beamers, apr_pool_cleanup_null); - beamers = apr_array_make(apr_hook_global_pool, 10, - sizeof(h2_bucket_beamer*)); - } - APR_ARRAY_PUSH(beamers, h2_bucket_beamer*) = beamer; -} - -static apr_bucket *h2_beam_bucket(h2_bucket_beam *beam, - apr_bucket_brigade *dest, - const apr_bucket *src) -{ - apr_bucket *b = NULL; - int i; - if (beamers) { - for (i = 0; i < beamers->nelts && b == NULL; ++i) { - h2_bucket_beamer *beamer; - - beamer = APR_ARRAY_IDX(beamers, i, h2_bucket_beamer*); - b = beamer(beam, dest, src); - } - } - return b; -} - - -/******************************************************************************* - * bucket beam that can transport buckets across threads - ******************************************************************************/ - -static void mutex_leave(void *ctx, apr_thread_mutex_t *lock) -{ - apr_thread_mutex_unlock(lock); -} + apr_bucket *b; + int count = 0; -static apr_status_t mutex_enter(void *ctx, h2_beam_lock *pbl) -{ - h2_bucket_beam *beam = ctx; - pbl->mutex = beam->lock; - pbl->leave = mutex_leave; - return apr_thread_mutex_lock(pbl->mutex); -} + for (b = H2_BLIST_FIRST(blist); b != H2_BLIST_SENTINEL(blist); + b = APR_BUCKET_NEXT(b)) { + ++count; + } + return count; +} + +#define H2_BEAM_LOG(beam, c, level, rv, msg, bb) \ + do { \ + if (APLOG_C_IS_LEVEL((c),(level))) { \ + char buffer[4 * 1024]; \ + apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); \ + len = bb? h2_util_bb_print(buffer, bmax, "", "", bb) : 0; \ + ap_log_cerror(APLOG_MARK, (level), rv, (c), \ + "BEAM[%s,%s%sdata=%ld,buckets(send/consumed)=%d/%d]: %s %s", \ + (beam)->name, \ + (beam)->aborted? "aborted," : "", \ + buffer_is_empty(beam)? "empty," : "", \ + (long)get_buffered_data_len(beam), \ + h2_blist_count(&(beam)->buckets_to_send), \ + h2_blist_count(&(beam)->buckets_consumed), \ + (msg), len? buffer : ""); \ + } \ + } while (0) -static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl) -{ - return mutex_enter(beam, pbl); -} -static void leave_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl) +static int bucket_is_mmap(apr_bucket *b) { - if (pbl->leave) { - pbl->leave(pbl->leave_ctx, pbl->mutex); - } +#if APR_HAS_MMAP + return APR_BUCKET_IS_MMAP(b); +#else + /* if it is not defined as enabled, it should always be no */ + return 0; +#endif } static apr_off_t bucket_mem_used(apr_bucket *b) { - if (APR_BUCKET_IS_FILE(b)) { + if (APR_BUCKET_IS_FILE(b) || bucket_is_mmap(b)) { return 0; } else { /* should all have determinate length */ - return b->length; + return (apr_off_t)b->length; } } -static int report_consumption(h2_bucket_beam *beam, h2_beam_lock *pbl) +static int report_consumption(h2_bucket_beam *beam, int locked) { int rv = 0; - apr_off_t len = beam->received_bytes - beam->cons_bytes_reported; + apr_off_t len = beam->recv_bytes - beam->recv_bytes_reported; h2_beam_io_callback *cb = beam->cons_io_cb; if (len > 0) { if (cb) { void *ctx = beam->cons_ctx; - if (pbl) leave_yellow(beam, pbl); + if (locked) apr_thread_mutex_unlock(beam->lock); cb(ctx, beam, len); - if (pbl) enter_yellow(beam, pbl); + if (locked) apr_thread_mutex_lock(beam->lock); rv = 1; } - beam->cons_bytes_reported += len; + beam->recv_bytes_reported += len; } return rv; } -static void report_prod_io(h2_bucket_beam *beam, int force, h2_beam_lock *pbl) -{ - apr_off_t len = beam->sent_bytes - beam->prod_bytes_reported; - if (force || len > 0) { - h2_beam_io_callback *cb = beam->prod_io_cb; - if (cb) { - void *ctx = beam->prod_ctx; - - leave_yellow(beam, pbl); - cb(ctx, beam, len); - enter_yellow(beam, pbl); - } - beam->prod_bytes_reported += len; - } -} - static apr_size_t calc_buffered(h2_bucket_beam *beam) { apr_size_t len = 0; apr_bucket *b; - for (b = H2_BLIST_FIRST(&beam->send_list); - b != H2_BLIST_SENTINEL(&beam->send_list); + for (b = H2_BLIST_FIRST(&beam->buckets_to_send); + b != H2_BLIST_SENTINEL(&beam->buckets_to_send); b = APR_BUCKET_NEXT(b)) { if (b->length == ((apr_size_t)-1)) { /* do not count */ } - else if (APR_BUCKET_IS_FILE(b)) { + else if (APR_BUCKET_IS_FILE(b) || bucket_is_mmap(b)) { /* if unread, has no real mem footprint. */ } else { @@ -288,13 +150,30 @@ static apr_size_t calc_buffered(h2_bucket_beam *beam) return len; } -static void r_purge_sent(h2_bucket_beam *beam) +static void purge_consumed_buckets(h2_bucket_beam *beam) +{ + apr_bucket *b; + /* delete all sender buckets in purge brigade, needs to be called + * from sender thread only */ + while (!H2_BLIST_EMPTY(&beam->buckets_consumed)) { + b = H2_BLIST_FIRST(&beam->buckets_consumed); + if(AP_BUCKET_IS_EOR(b)) { + APR_BUCKET_REMOVE(b); + H2_BLIST_INSERT_TAIL(&beam->buckets_eor, b); + } + else { + apr_bucket_delete(b); + } + } +} + +static void purge_eor_buckets(h2_bucket_beam *beam) { apr_bucket *b; /* delete all sender buckets in purge brigade, needs to be called * from sender thread only */ - while (!H2_BLIST_EMPTY(&beam->purge_list)) { - b = H2_BLIST_FIRST(&beam->purge_list); + while (!H2_BLIST_EMPTY(&beam->buckets_eor)) { + b = H2_BLIST_FIRST(&beam->buckets_eor); apr_bucket_delete(b); } } @@ -302,7 +181,7 @@ static void r_purge_sent(h2_bucket_beam *beam) static apr_size_t calc_space_left(h2_bucket_beam *beam) { if (beam->max_buf_size > 0) { - apr_off_t len = calc_buffered(beam); + apr_size_t len = calc_buffered(beam); return (beam->max_buf_size > len? (beam->max_buf_size - len) : 0); } return APR_SIZE_MAX; @@ -310,31 +189,10 @@ static apr_size_t calc_space_left(h2_bucket_beam *beam) static int buffer_is_empty(h2_bucket_beam *beam) { - return ((!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer)) - && H2_BLIST_EMPTY(&beam->send_list)); + return H2_BLIST_EMPTY(&beam->buckets_to_send); } -static apr_status_t wait_empty(h2_bucket_beam *beam, apr_read_type_e block, - apr_thread_mutex_t *lock) -{ - apr_status_t rv = APR_SUCCESS; - - while (!buffer_is_empty(beam) && APR_SUCCESS == rv) { - if (APR_BLOCK_READ != block || !lock) { - rv = APR_EAGAIN; - } - else if (beam->timeout > 0) { - rv = apr_thread_cond_timedwait(beam->change, lock, beam->timeout); - } - else { - rv = apr_thread_cond_wait(beam->change, lock); - } - } - return rv; -} - -static apr_status_t wait_not_empty(h2_bucket_beam *beam, apr_read_type_e block, - apr_thread_mutex_t *lock) +static apr_status_t wait_not_empty(h2_bucket_beam *beam, conn_rec *c, apr_read_type_e block) { apr_status_t rv = APR_SUCCESS; @@ -345,21 +203,24 @@ static apr_status_t wait_not_empty(h2_bucket_beam *beam, apr_read_type_e block, else if (beam->closed) { rv = APR_EOF; } - else if (APR_BLOCK_READ != block || !lock) { + else if (APR_BLOCK_READ != block) { rv = APR_EAGAIN; } else if (beam->timeout > 0) { - rv = apr_thread_cond_timedwait(beam->change, lock, beam->timeout); + H2_BEAM_LOG(beam, c, APLOG_TRACE2, rv, "wait_not_empty, timeout", NULL); + rv = apr_thread_cond_timedwait(beam->change, beam->lock, beam->timeout); } else { - rv = apr_thread_cond_wait(beam->change, lock); + H2_BEAM_LOG(beam, c, APLOG_TRACE2, rv, "wait_not_empty, forever", NULL); + rv = apr_thread_cond_wait(beam->change, beam->lock); } } return rv; } -static apr_status_t wait_not_full(h2_bucket_beam *beam, apr_read_type_e block, - apr_size_t *pspace_left, h2_beam_lock *bl) +static apr_status_t wait_not_full(h2_bucket_beam *beam, conn_rec *c, + apr_read_type_e block, + apr_size_t *pspace_left) { apr_status_t rv = APR_SUCCESS; apr_size_t left; @@ -368,15 +229,17 @@ static apr_status_t wait_not_full(h2_bucket_beam *beam, apr_read_type_e block, if (beam->aborted) { rv = APR_ECONNABORTED; } - else if (block != APR_BLOCK_READ || !bl->mutex) { + else if (block != APR_BLOCK_READ) { rv = APR_EAGAIN; } else { if (beam->timeout > 0) { - rv = apr_thread_cond_timedwait(beam->change, bl->mutex, beam->timeout); + H2_BEAM_LOG(beam, c, APLOG_TRACE2, rv, "wait_not_full, timeout", NULL); + rv = apr_thread_cond_timedwait(beam->change, beam->lock, beam->timeout); } else { - rv = apr_thread_cond_wait(beam->change, bl->mutex); + H2_BEAM_LOG(beam, c, APLOG_TRACE2, rv, "wait_not_full, forever", NULL); + rv = apr_thread_cond_wait(beam->change, beam->lock); } } } @@ -384,73 +247,6 @@ static apr_status_t wait_not_full(h2_bucket_beam *beam, apr_read_type_e block, return rv; } -static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy) -{ - h2_beam_lock bl; - apr_bucket *b, *next; - - if (enter_yellow(beam, &bl) == APR_SUCCESS) { - /* even when beam buckets are split, only the one where - * refcount drops to 0 will call us */ - H2_BPROXY_REMOVE(proxy); - /* invoked from receiver thread, the last beam bucket for the send - * bucket is about to be destroyed. - * remove it from the hold, where it should be now */ - if (proxy->bsender) { - for (b = H2_BLIST_FIRST(&beam->hold_list); - b != H2_BLIST_SENTINEL(&beam->hold_list); - b = APR_BUCKET_NEXT(b)) { - if (b == proxy->bsender) { - break; - } - } - if (b != H2_BLIST_SENTINEL(&beam->hold_list)) { - /* bucket is in hold as it should be, mark this one - * and all before it for purging. We might have placed meta - * buckets without a receiver proxy into the hold before it - * and schedule them for purging now */ - for (b = H2_BLIST_FIRST(&beam->hold_list); - b != H2_BLIST_SENTINEL(&beam->hold_list); - b = next) { - next = APR_BUCKET_NEXT(b); - if (b == proxy->bsender) { - APR_BUCKET_REMOVE(b); - H2_BLIST_INSERT_TAIL(&beam->purge_list, b); - break; - } - else if (APR_BUCKET_IS_METADATA(b)) { - APR_BUCKET_REMOVE(b); - H2_BLIST_INSERT_TAIL(&beam->purge_list, b); - } - else { - /* another data bucket before this one in hold. this - * is normal since DATA buckets need not be destroyed - * in order */ - } - } - - proxy->bsender = NULL; - } - else { - /* it should be there unless we screwed up */ - ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->send_pool, - APLOGNO(03384) "h2_beam(%d-%s): emitted bucket not " - "in hold, n=%d", beam->id, beam->tag, - (int)proxy->n); - ap_assert(!proxy->bsender); - } - } - /* notify anyone waiting on space to become available */ - if (!bl.mutex) { - r_purge_sent(beam); - } - else { - apr_thread_cond_broadcast(beam->change); - } - leave_yellow(beam, &bl); - } -} - static void h2_blist_cleanup(h2_blist *bl) { apr_bucket *e; @@ -461,335 +257,203 @@ static void h2_blist_cleanup(h2_blist *bl) } } -static apr_status_t beam_close(h2_bucket_beam *beam) +static void beam_shutdown(h2_bucket_beam *beam, apr_shutdown_how_e how) { - if (!beam->closed) { - beam->closed = 1; - apr_thread_cond_broadcast(beam->change); + if (!beam->pool) { + /* pool being cleared already */ + return; } - return APR_SUCCESS; -} - -int h2_beam_is_closed(h2_bucket_beam *beam) -{ - return beam->closed; -} -static int pool_register(h2_bucket_beam *beam, apr_pool_t *pool, - apr_status_t (*cleanup)(void *)) -{ - if (pool && pool != beam->pool) { - apr_pool_pre_cleanup_register(pool, beam, cleanup); - return 1; + /* shutdown both receiver and sender? */ + if (how == APR_SHUTDOWN_READWRITE) { + beam->cons_io_cb = NULL; + beam->recv_cb = NULL; + beam->eagain_cb = NULL; } - return 0; -} -static int pool_kill(h2_bucket_beam *beam, apr_pool_t *pool, - apr_status_t (*cleanup)(void *)) { - if (pool && pool != beam->pool) { - apr_pool_cleanup_kill(pool, beam, cleanup); - return 1; + /* shutdown sender (or both)? */ + if (how != APR_SHUTDOWN_READ) { + purge_consumed_buckets(beam); + h2_blist_cleanup(&beam->buckets_to_send); } - return 0; } -static apr_status_t beam_recv_cleanup(void *data) +static apr_status_t beam_cleanup(void *data) { h2_bucket_beam *beam = data; - /* receiver pool has gone away, clear references */ - beam->recv_buffer = NULL; - beam->recv_pool = NULL; + beam_shutdown(beam, APR_SHUTDOWN_READWRITE); + purge_eor_buckets(beam); + beam->pool = NULL; /* the pool is clearing now */ return APR_SUCCESS; } -static apr_status_t beam_send_cleanup(void *data) +apr_status_t h2_beam_destroy(h2_bucket_beam *beam, conn_rec *c) { - h2_bucket_beam *beam = data; - /* sender is going away, clear up all references to its memory */ - r_purge_sent(beam); - h2_blist_cleanup(&beam->send_list); - report_consumption(beam, NULL); - while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) { - h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies); - H2_BPROXY_REMOVE(proxy); - proxy->beam = NULL; - proxy->bsender = NULL; + if (beam->pool) { + H2_BEAM_LOG(beam, c, APLOG_TRACE2, 0, "destroy", NULL); + apr_pool_cleanup_run(beam->pool, beam, beam_cleanup); } - h2_blist_cleanup(&beam->purge_list); - h2_blist_cleanup(&beam->hold_list); - beam->send_pool = NULL; + H2_BEAM_LOG(beam, c, APLOG_TRACE2, 0, "destroyed", NULL); return APR_SUCCESS; } -static void beam_set_send_pool(h2_bucket_beam *beam, apr_pool_t *pool) -{ - if (beam->send_pool != pool) { - if (beam->send_pool && beam->send_pool != beam->pool) { - pool_kill(beam, beam->send_pool, beam_send_cleanup); - beam_send_cleanup(beam); - } - beam->send_pool = pool; - pool_register(beam, beam->send_pool, beam_send_cleanup); - } -} - -static void recv_buffer_cleanup(h2_bucket_beam *beam, h2_beam_lock *bl) -{ - if (beam->recv_buffer && !APR_BRIGADE_EMPTY(beam->recv_buffer)) { - apr_bucket_brigade *bb = beam->recv_buffer; - apr_off_t bblen = 0; - - beam->recv_buffer = NULL; - apr_brigade_length(bb, 0, &bblen); - beam->received_bytes += bblen; - - /* need to do this unlocked since bucket destroy might - * call this beam again. */ - if (bl) leave_yellow(beam, bl); - apr_brigade_destroy(bb); - if (bl) enter_yellow(beam, bl); - - apr_thread_cond_broadcast(beam->change); - if (beam->cons_ev_cb) { - beam->cons_ev_cb(beam->cons_ctx, beam); - } - } -} - -static apr_status_t beam_cleanup(h2_bucket_beam *beam, int from_pool) -{ - apr_status_t status = APR_SUCCESS; - int safe_send = (beam->owner == H2_BEAM_OWNER_SEND); - int safe_recv = (beam->owner == H2_BEAM_OWNER_RECV); - - /* - * Owner of the beam is going away, depending on which side it owns, - * cleanup strategies will differ. - * - * In general, receiver holds references to memory from sender. - * Clean up receiver first, if safe, then cleanup sender, if safe. - */ - - /* When called from pool destroy, io callbacks are disabled */ - if (from_pool) { - beam->cons_io_cb = NULL; - } - - /* When modify send is not safe, this means we still have multi-thread - * protection and the owner is receiving the buckets. If the sending - * side has not gone away, this means we could have dangling buckets - * in our lists that never get destroyed. This should not happen. */ - ap_assert(safe_send || !beam->send_pool); - if (!H2_BLIST_EMPTY(&beam->send_list)) { - ap_assert(beam->send_pool); - } - - if (safe_recv) { - if (beam->recv_pool) { - pool_kill(beam, beam->recv_pool, beam_recv_cleanup); - beam->recv_pool = NULL; - } - recv_buffer_cleanup(beam, NULL); - } - else { - beam->recv_buffer = NULL; - beam->recv_pool = NULL; - } - - if (safe_send && beam->send_pool) { - pool_kill(beam, beam->send_pool, beam_send_cleanup); - status = beam_send_cleanup(beam); - } - - if (safe_recv) { - ap_assert(H2_BPROXY_LIST_EMPTY(&beam->proxies)); - ap_assert(H2_BLIST_EMPTY(&beam->send_list)); - ap_assert(H2_BLIST_EMPTY(&beam->hold_list)); - ap_assert(H2_BLIST_EMPTY(&beam->purge_list)); - } - return status; -} - -static apr_status_t beam_pool_cleanup(void *data) -{ - return beam_cleanup(data, 1); -} - -apr_status_t h2_beam_destroy(h2_bucket_beam *beam) -{ - apr_pool_cleanup_kill(beam->pool, beam, beam_pool_cleanup); - return beam_cleanup(beam, 0); -} - -apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, - int id, const char *tag, - h2_beam_owner_t owner, +apr_status_t h2_beam_create(h2_bucket_beam **pbeam, conn_rec *from, + apr_pool_t *pool, int id, const char *tag, apr_size_t max_buf_size, apr_interval_time_t timeout) { h2_bucket_beam *beam; - apr_status_t rv = APR_SUCCESS; + h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(from); + apr_status_t rv; beam = apr_pcalloc(pool, sizeof(*beam)); - if (!beam) { - return APR_ENOMEM; - } - - beam->id = id; - beam->tag = tag; beam->pool = pool; - beam->owner = owner; - H2_BLIST_INIT(&beam->send_list); - H2_BLIST_INIT(&beam->hold_list); - H2_BLIST_INIT(&beam->purge_list); - H2_BPROXY_LIST_INIT(&beam->proxies); + beam->from = from; + beam->id = id; + beam->name = apr_psprintf(pool, "%s-%d-%s", + conn_ctx->id, id, tag); + + H2_BLIST_INIT(&beam->buckets_to_send); + H2_BLIST_INIT(&beam->buckets_consumed); + H2_BLIST_INIT(&beam->buckets_eor); beam->tx_mem_limits = 1; beam->max_buf_size = max_buf_size; beam->timeout = timeout; rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, pool); - if (APR_SUCCESS == rv) { - rv = apr_thread_cond_create(&beam->change, pool); - if (APR_SUCCESS == rv) { - apr_pool_pre_cleanup_register(pool, beam, beam_pool_cleanup); - *pbeam = beam; - } - } + if (APR_SUCCESS != rv) goto cleanup; + rv = apr_thread_cond_create(&beam->change, pool); + if (APR_SUCCESS != rv) goto cleanup; + apr_pool_pre_cleanup_register(pool, beam, beam_cleanup); + +cleanup: + H2_BEAM_LOG(beam, from, APLOG_TRACE2, rv, "created", NULL); + *pbeam = (APR_SUCCESS == rv)? beam : NULL; return rv; } void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size) { - h2_beam_lock bl; - - if (enter_yellow(beam, &bl) == APR_SUCCESS) { - beam->max_buf_size = buffer_size; - leave_yellow(beam, &bl); - } + apr_thread_mutex_lock(beam->lock); + beam->max_buf_size = buffer_size; + apr_thread_mutex_unlock(beam->lock); } -apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam) +void h2_beam_set_copy_files(h2_bucket_beam * beam, int enabled) { - h2_beam_lock bl; - apr_size_t buffer_size = 0; - - if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) { - buffer_size = beam->max_buf_size; - leave_yellow(beam, &bl); - } - return buffer_size; + apr_thread_mutex_lock(beam->lock); + beam->copy_files = enabled; + apr_thread_mutex_unlock(beam->lock); } -void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout) +apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam) { - h2_beam_lock bl; + apr_size_t buffer_size = 0; - if (enter_yellow(beam, &bl) == APR_SUCCESS) { - beam->timeout = timeout; - leave_yellow(beam, &bl); - } + apr_thread_mutex_lock(beam->lock); + buffer_size = beam->max_buf_size; + apr_thread_mutex_unlock(beam->lock); + return buffer_size; } apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam) { - h2_beam_lock bl; - apr_interval_time_t timeout = 0; - - if (enter_yellow(beam, &bl) == APR_SUCCESS) { - timeout = beam->timeout; - leave_yellow(beam, &bl); - } + apr_interval_time_t timeout; + + apr_thread_mutex_lock(beam->lock); + timeout = beam->timeout; + apr_thread_mutex_unlock(beam->lock); return timeout; } -void h2_beam_abort(h2_bucket_beam *beam) +void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout) { - h2_beam_lock bl; - - if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) { - beam->aborted = 1; - r_purge_sent(beam); - h2_blist_cleanup(&beam->send_list); - report_consumption(beam, &bl); - apr_thread_cond_broadcast(beam->change); - leave_yellow(beam, &bl); - } + apr_thread_mutex_lock(beam->lock); + beam->timeout = timeout; + apr_thread_mutex_unlock(beam->lock); } -apr_status_t h2_beam_close(h2_bucket_beam *beam) +void h2_beam_abort(h2_bucket_beam *beam, conn_rec *c) { - h2_beam_lock bl; - - if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) { - r_purge_sent(beam); - beam_close(beam); - report_consumption(beam, &bl); - leave_yellow(beam, &bl); - } - return beam->aborted? APR_ECONNABORTED : APR_SUCCESS; -} + apr_thread_mutex_lock(beam->lock); + beam->aborted = 1; + if (c == beam->from) { + /* sender aborts */ + if (beam->send_cb) { + beam->send_cb(beam->send_ctx, beam); + } + if (beam->was_empty_cb && buffer_is_empty(beam)) { + beam->was_empty_cb(beam->was_empty_ctx, beam); + } + /* no more consumption reporting to sender */ + report_consumption(beam, 1); + beam->cons_ctx = NULL; -apr_status_t h2_beam_leave(h2_bucket_beam *beam) -{ - h2_beam_lock bl; - - if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) { - recv_buffer_cleanup(beam, &bl); - beam->aborted = 1; - beam_close(beam); - leave_yellow(beam, &bl); + beam_shutdown(beam, APR_SHUTDOWN_WRITE); } - return APR_SUCCESS; -} - -apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block) -{ - apr_status_t status; - h2_beam_lock bl; - - if ((status = enter_yellow(beam, &bl)) == APR_SUCCESS) { - status = wait_empty(beam, block, bl.mutex); - leave_yellow(beam, &bl); + else { + /* receiver aborts */ + beam_shutdown(beam, APR_SHUTDOWN_READ); } - return status; + apr_thread_cond_broadcast(beam->change); + apr_thread_mutex_unlock(beam->lock); } -static void move_to_hold(h2_bucket_beam *beam, - apr_bucket_brigade *sender_bb) +void h2_beam_close(h2_bucket_beam *beam, conn_rec *c) { - apr_bucket *b; - while (sender_bb && !APR_BRIGADE_EMPTY(sender_bb)) { - b = APR_BRIGADE_FIRST(sender_bb); - APR_BUCKET_REMOVE(b); - H2_BLIST_INSERT_TAIL(&beam->send_list, b); + apr_thread_mutex_lock(beam->lock); + if (!beam->closed) { + /* should only be called from sender */ + ap_assert(c == beam->from); + beam->closed = 1; + if (beam->send_cb) { + beam->send_cb(beam->send_ctx, beam); + } + if (beam->was_empty_cb && buffer_is_empty(beam)) { + beam->was_empty_cb(beam->was_empty_ctx, beam); + } + apr_thread_cond_broadcast(beam->change); } + apr_thread_mutex_unlock(beam->lock); } -static apr_status_t append_bucket(h2_bucket_beam *beam, - apr_bucket *b, +static apr_status_t append_bucket(h2_bucket_beam *beam, + apr_bucket_brigade *bb, apr_read_type_e block, apr_size_t *pspace_left, - h2_beam_lock *pbl) + apr_off_t *pwritten) { + apr_bucket *b; const char *data; apr_size_t len; - apr_status_t status; - int can_beam = 0, check_len; + apr_status_t rv = APR_SUCCESS; + int can_beam = 0; + (void)block; if (beam->aborted) { - return APR_ECONNABORTED; + rv = APR_ECONNABORTED; + goto cleanup; } - + + ap_assert(beam->pool); + + b = APR_BRIGADE_FIRST(bb); if (APR_BUCKET_IS_METADATA(b)) { - if (APR_BUCKET_IS_EOS(b)) { - beam->closed = 1; - } APR_BUCKET_REMOVE(b); - H2_BLIST_INSERT_TAIL(&beam->send_list, b); - return APR_SUCCESS; + apr_bucket_setaside(b, beam->pool); + H2_BLIST_INSERT_TAIL(&beam->buckets_to_send, b); + goto cleanup; + } + /* non meta bucket */ + + /* in case of indeterminate length, we need to read the bucket, + * so that it transforms itself into something stable. */ + if (b->length == ((apr_size_t)-1)) { + rv = apr_bucket_read(b, &data, &len, APR_BLOCK_READ); + if (rv != APR_SUCCESS) goto cleanup; } - else if (APR_BUCKET_IS_FILE(b)) { + + if (APR_BUCKET_IS_FILE(b)) { /* For file buckets the problem is their internal readpool that * is used on the first read to allocate buffer/mmap. * Since setting aside a file bucket will de-register the @@ -806,478 +470,414 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, * of open file handles and rather use a less efficient beam * transport. */ apr_bucket_file *bf = b->data; - apr_file_t *fd = bf->fd; - can_beam = (bf->refcount.refcount == 1); - if (can_beam && beam->can_beam_fn) { - can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd); - } - check_len = !can_beam; + can_beam = !beam->copy_files && (bf->refcount.refcount == 1); } - else { - if (b->length == ((apr_size_t)-1)) { - const char *data; - status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ); - if (status != APR_SUCCESS) { - return status; - } - } - check_len = 1; + else if (bucket_is_mmap(b)) { + can_beam = !beam->copy_files; } - - if (check_len) { - if (b->length > *pspace_left) { - apr_bucket_split(b, *pspace_left); - } - *pspace_left -= b->length; + + if (b->length == 0) { + apr_bucket_delete(b); + rv = APR_SUCCESS; + goto cleanup; } - /* The fundamental problem is that reading a sender bucket from - * a receiver thread is a total NO GO, because the bucket might use - * its pool/bucket_alloc from a foreign thread and that will - * corrupt. */ - status = APR_ENOTIMPL; - if (APR_BUCKET_IS_TRANSIENT(b)) { - /* this takes care of transient buckets and converts them - * into heap ones. Other bucket types might or might not be - * affected by this. */ - status = apr_bucket_setaside(b, beam->send_pool); + if (!*pspace_left) { + rv = APR_EAGAIN; + goto cleanup; } - else if (APR_BUCKET_IS_HEAP(b)) { - /* For heap buckets read from a receiver thread is fine. The + + /* bucket is accepted and added to beam->buckets_to_send */ + if (APR_BUCKET_IS_HEAP(b)) { + /* For heap buckets, a read from a receiver thread is fine. The * data will be there and live until the bucket itself is * destroyed. */ - status = APR_SUCCESS; + rv = apr_bucket_setaside(b, beam->pool); + if (rv != APR_SUCCESS) goto cleanup; } - else if (APR_BUCKET_IS_POOL(b)) { - /* pool buckets are bastards that register at pool cleanup - * to morph themselves into heap buckets. That may happen anytime, - * even after the bucket data pointer has been read. So at - * any time inside the receiver thread, the pool bucket memory - * may disappear. yikes. */ - status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ); - if (status == APR_SUCCESS) { - apr_bucket_heap_make(b, data, len, NULL); - } + else if (can_beam && (APR_BUCKET_IS_FILE(b) || bucket_is_mmap(b))) { + rv = apr_bucket_setaside(b, beam->pool); + if (rv != APR_SUCCESS) goto cleanup; } - else if (APR_BUCKET_IS_FILE(b) && can_beam) { - status = apr_bucket_setaside(b, beam->send_pool); + else { + /* we know of no special shortcut to transfer the bucket to + * another pool without copying. So we make it a heap bucket. */ + apr_bucket *b2; + + rv = apr_bucket_read(b, &data, &len, APR_BLOCK_READ); + if (rv != APR_SUCCESS) goto cleanup; + /* this allocates and copies data */ + b2 = apr_bucket_heap_create(data, len, NULL, bb->bucket_alloc); + apr_bucket_delete(b); + b = b2; + APR_BRIGADE_INSERT_HEAD(bb, b); } - if (status == APR_ENOTIMPL) { - /* we have no knowledge about the internals of this bucket, - * but hope that after read, its data stays immutable for the - * lifetime of the bucket. (see pool bucket handling above for - * a counter example). - * We do the read while in the sender thread, so that the bucket may - * use pools/allocators safely. */ - status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ); - if (status == APR_SUCCESS) { - status = apr_bucket_setaside(b, beam->send_pool); - } + APR_BUCKET_REMOVE(b); + H2_BLIST_INSERT_TAIL(&beam->buckets_to_send, b); + *pwritten += (apr_off_t)b->length; + if (b->length > *pspace_left) { + *pspace_left = 0; } - - if (status != APR_SUCCESS && status != APR_ENOTIMPL) { - return status; + else { + *pspace_left -= b->length; } - - APR_BUCKET_REMOVE(b); - H2_BLIST_INSERT_TAIL(&beam->send_list, b); - beam->sent_bytes += b->length; - - return APR_SUCCESS; -} -void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p) -{ - h2_beam_lock bl; - /* Called from the sender thread to add buckets to the beam */ - if (enter_yellow(beam, &bl) == APR_SUCCESS) { - r_purge_sent(beam); - beam_set_send_pool(beam, p); - leave_yellow(beam, &bl); - } +cleanup: + return rv; } -apr_status_t h2_beam_send(h2_bucket_beam *beam, +apr_status_t h2_beam_send(h2_bucket_beam *beam, conn_rec *from, apr_bucket_brigade *sender_bb, - apr_read_type_e block) + apr_read_type_e block, + apr_off_t *pwritten) { - apr_bucket *b; apr_status_t rv = APR_SUCCESS; apr_size_t space_left = 0; - h2_beam_lock bl; + int was_empty; + + ap_assert(beam->pool); /* Called from the sender thread to add buckets to the beam */ - if (enter_yellow(beam, &bl) == APR_SUCCESS) { - ap_assert(beam->send_pool); - r_purge_sent(beam); - + apr_thread_mutex_lock(beam->lock); + ap_assert(beam->from == from); + ap_assert(sender_bb); + H2_BEAM_LOG(beam, from, APLOG_TRACE2, rv, "start send", sender_bb); + purge_consumed_buckets(beam); + *pwritten = 0; + was_empty = buffer_is_empty(beam); + + space_left = calc_space_left(beam); + while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) { + rv = append_bucket(beam, sender_bb, block, &space_left, pwritten); if (beam->aborted) { - move_to_hold(beam, sender_bb); - rv = APR_ECONNABORTED; - } - else if (sender_bb) { - int force_report = !APR_BRIGADE_EMPTY(sender_bb); - - space_left = calc_space_left(beam); - while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) { - if (space_left <= 0) { - report_prod_io(beam, force_report, &bl); - r_purge_sent(beam); - rv = wait_not_full(beam, block, &space_left, &bl); - if (APR_SUCCESS != rv) { - break; - } - } - b = APR_BRIGADE_FIRST(sender_bb); - rv = append_bucket(beam, b, block, &space_left, &bl); + goto cleanup; + } + else if (APR_EAGAIN == rv) { + /* bucket was not added, as beam buffer has no space left. + * Trigger event callbacks, so receiver can know there is something + * to receive before we do a conditional wait. */ + purge_consumed_buckets(beam); + if (beam->send_cb) { + beam->send_cb(beam->send_ctx, beam); } - - report_prod_io(beam, force_report, &bl); - apr_thread_cond_broadcast(beam->change); + if (was_empty && beam->was_empty_cb) { + beam->was_empty_cb(beam->was_empty_ctx, beam); + } + rv = wait_not_full(beam, from, block, &space_left); + if (APR_SUCCESS != rv) { + break; + } + was_empty = buffer_is_empty(beam); } - report_consumption(beam, &bl); - leave_yellow(beam, &bl); } + +cleanup: + if (beam->send_cb && !buffer_is_empty(beam)) { + beam->send_cb(beam->send_ctx, beam); + } + if (was_empty && beam->was_empty_cb && !buffer_is_empty(beam)) { + beam->was_empty_cb(beam->was_empty_ctx, beam); + } + apr_thread_cond_broadcast(beam->change); + + report_consumption(beam, 1); + if (beam->aborted) { + rv = APR_ECONNABORTED; + } + H2_BEAM_LOG(beam, from, APLOG_TRACE2, rv, "end send", sender_bb); + if(rv != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(rv) && sender_bb != NULL) { + apr_brigade_cleanup(sender_bb); + } + apr_thread_mutex_unlock(beam->lock); return rv; } -apr_status_t h2_beam_receive(h2_bucket_beam *beam, +apr_status_t h2_beam_receive(h2_bucket_beam *beam, + conn_rec *to, apr_bucket_brigade *bb, apr_read_type_e block, apr_off_t readbytes) { - h2_beam_lock bl; apr_bucket *bsender, *brecv, *ng; int transferred = 0; - apr_status_t status = APR_SUCCESS; + apr_status_t rv = APR_SUCCESS; apr_off_t remain; - int transferred_buckets = 0; - - /* Called from the receiver thread to take buckets from the beam */ - if (enter_yellow(beam, &bl) == APR_SUCCESS) { - if (readbytes <= 0) { - readbytes = APR_SIZE_MAX; - } - remain = readbytes; - + int consumed_buckets = 0; + + apr_thread_mutex_lock(beam->lock); + H2_BEAM_LOG(beam, to, APLOG_TRACE2, 0, "start receive", bb); + if (readbytes <= 0) { + readbytes = (apr_off_t)APR_SIZE_MAX; + } + remain = readbytes; + transfer: - if (beam->aborted) { - recv_buffer_cleanup(beam, &bl); - status = APR_ECONNABORTED; - goto leave; - } + if (beam->aborted) { + beam_shutdown(beam, APR_SHUTDOWN_READ); + rv = APR_ECONNABORTED; + goto leave; + } - /* transfer enough buckets from our receiver brigade, if we have one */ - while (remain >= 0 - && beam->recv_buffer - && !APR_BRIGADE_EMPTY(beam->recv_buffer)) { - - brecv = APR_BRIGADE_FIRST(beam->recv_buffer); - if (brecv->length > 0 && remain <= 0) { - break; - } - APR_BUCKET_REMOVE(brecv); - APR_BRIGADE_INSERT_TAIL(bb, brecv); - remain -= brecv->length; - ++transferred; + ap_assert(beam->pool); + + /* transfer from our sender brigade, transforming sender buckets to + * receiver ones until we have enough */ + while (remain >= 0 && !H2_BLIST_EMPTY(&beam->buckets_to_send)) { + + brecv = NULL; + bsender = H2_BLIST_FIRST(&beam->buckets_to_send); + if (bsender->length > 0 && remain <= 0) { + break; } - /* transfer from our sender brigade, transforming sender buckets to - * receiver ones until we have enough */ - while (remain >= 0 && !H2_BLIST_EMPTY(&beam->send_list)) { - - brecv = NULL; - bsender = H2_BLIST_FIRST(&beam->send_list); - if (bsender->length > 0 && remain <= 0) { - break; + if (APR_BUCKET_IS_METADATA(bsender)) { + /* we need a real copy into the receivers bucket_alloc */ + if (APR_BUCKET_IS_EOS(bsender)) { + /* this closes the beam */ + beam->closed = 1; + brecv = apr_bucket_eos_create(bb->bucket_alloc); } - - if (APR_BUCKET_IS_METADATA(bsender)) { - if (APR_BUCKET_IS_EOS(bsender)) { - brecv = apr_bucket_eos_create(bb->bucket_alloc); - beam->close_sent = 1; - } - else if (APR_BUCKET_IS_FLUSH(bsender)) { - brecv = apr_bucket_flush_create(bb->bucket_alloc); - } - else if (AP_BUCKET_IS_ERROR(bsender)) { - ap_bucket_error *eb = (ap_bucket_error *)bsender; - brecv = ap_bucket_error_create(eb->status, eb->data, - bb->p, bb->bucket_alloc); - } + else if (APR_BUCKET_IS_FLUSH(bsender)) { + brecv = apr_bucket_flush_create(bb->bucket_alloc); } - else if (bsender->length == 0) { - APR_BUCKET_REMOVE(bsender); - H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender); - continue; +#if AP_HAS_RESPONSE_BUCKETS + else if (AP_BUCKET_IS_RESPONSE(bsender)) { + brecv = ap_bucket_response_clone(bsender, bb->p, bb->bucket_alloc); } - else if (APR_BUCKET_IS_FILE(bsender)) { - /* This is set aside into the target brigade pool so that - * any read operation messes with that pool and not - * the sender one. */ - apr_bucket_file *f = (apr_bucket_file *)bsender->data; - apr_file_t *fd = f->fd; - int setaside = (f->readpool != bb->p); - - if (setaside) { - status = apr_file_setaside(&fd, fd, bb->p); - if (status != APR_SUCCESS) { - goto leave; - } - ++beam->files_beamed; - } - ng = apr_brigade_insert_file(bb, fd, bsender->start, bsender->length, - bb->p); -#if APR_HAS_MMAP - /* disable mmap handling as this leads to segfaults when - * the underlying file is changed while memory pointer has - * been handed out. See also PR 59348 */ - apr_bucket_file_enable_mmap(ng, 0); -#endif - APR_BUCKET_REMOVE(bsender); - H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender); - - remain -= bsender->length; - ++transferred; - ++transferred_buckets; - continue; + else if (AP_BUCKET_IS_REQUEST(bsender)) { + brecv = ap_bucket_request_clone(bsender, bb->p, bb->bucket_alloc); } - else { - /* create a "receiver" standin bucket. we took care about the - * underlying sender bucket and its data when we placed it into - * the sender brigade. - * the beam bucket will notify us on destruction that bsender is - * no longer needed. */ - brecv = h2_beam_bucket_create(beam, bsender, bb->bucket_alloc, - beam->buckets_sent++); + else if (AP_BUCKET_IS_HEADERS(bsender)) { + brecv = ap_bucket_headers_clone(bsender, bb->p, bb->bucket_alloc); } - - /* Place the sender bucket into our hold, to be destroyed when no - * receiver bucket references it any more. */ - APR_BUCKET_REMOVE(bsender); - H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender); - - beam->received_bytes += bsender->length; - ++transferred_buckets; - - if (brecv) { - APR_BRIGADE_INSERT_TAIL(bb, brecv); - remain -= brecv->length; - ++transferred; +#else + else if (H2_BUCKET_IS_HEADERS(bsender)) { + brecv = h2_bucket_headers_clone(bsender, bb->p, bb->bucket_alloc); } - else { - /* let outside hook determine how bucket is beamed */ - leave_yellow(beam, &bl); - brecv = h2_beam_bucket(beam, bb, bsender); - enter_yellow(beam, &bl); - - while (brecv && brecv != APR_BRIGADE_SENTINEL(bb)) { - ++transferred; - remain -= brecv->length; - brecv = APR_BUCKET_NEXT(brecv); - } +#endif /* AP_HAS_RESPONSE_BUCKETS */ + else if (AP_BUCKET_IS_ERROR(bsender)) { + ap_bucket_error *eb = bsender->data; + brecv = ap_bucket_error_create(eb->status, eb->data, + bb->p, bb->bucket_alloc); } } - - if (remain < 0) { - /* too much, put some back into out recv_buffer */ - remain = readbytes; - for (brecv = APR_BRIGADE_FIRST(bb); - brecv != APR_BRIGADE_SENTINEL(bb); - brecv = APR_BUCKET_NEXT(brecv)) { - remain -= (beam->tx_mem_limits? bucket_mem_used(brecv) - : brecv->length); - if (remain < 0) { - apr_bucket_split(brecv, brecv->length+remain); - beam->recv_buffer = apr_brigade_split_ex(bb, - APR_BUCKET_NEXT(brecv), - beam->recv_buffer); - break; - } - } + else if (bsender->length == 0) { + /* nop */ } - - if (beam->closed && buffer_is_empty(beam)) { - /* beam is closed and we have nothing more to receive */ - if (!beam->close_sent) { - apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc); - APR_BRIGADE_INSERT_TAIL(bb, b); - beam->close_sent = 1; - ++transferred; - status = APR_SUCCESS; - } +#if APR_HAS_MMAP + else if (APR_BUCKET_IS_MMAP(bsender)) { + apr_bucket_mmap *bmmap = bsender->data; + apr_mmap_t *mmap; + rv = apr_mmap_dup(&mmap, bmmap->mmap, bb->p); + if (rv != APR_SUCCESS) goto leave; + brecv = apr_bucket_mmap_create(mmap, bsender->start, bsender->length, bb->bucket_alloc); } - - if (transferred_buckets > 0) { - if (beam->cons_ev_cb) { - beam->cons_ev_cb(beam->cons_ctx, beam); +#endif + else if (APR_BUCKET_IS_FILE(bsender)) { + /* This is setaside into the target brigade pool so that + * any read operation messes with that pool and not + * the sender one. */ + apr_bucket_file *f = (apr_bucket_file *)bsender->data; + apr_file_t *fd = f->fd; + int setaside = (f->readpool != bb->p); + + if (setaside) { + rv = apr_file_setaside(&fd, fd, bb->p); + if (rv != APR_SUCCESS) goto leave; } - } - - if (transferred) { - apr_thread_cond_broadcast(beam->change); - status = APR_SUCCESS; + ng = apr_brigade_insert_file(bb, fd, bsender->start, (apr_off_t)bsender->length, + bb->p); +#if APR_HAS_MMAP + /* disable mmap handling as this leads to segfaults when + * the underlying file is changed while memory pointer has + * been handed out. See also PR 59348 */ + apr_bucket_file_enable_mmap(ng, 0); +#endif + remain -= bsender->length; + ++transferred; } else { - status = wait_not_empty(beam, block, bl.mutex); - if (status != APR_SUCCESS) { - goto leave; - } - goto transfer; + const char *data; + apr_size_t dlen; + /* we did that when the bucket was added, so this should + * give us the same data as before without changing the bucket + * or anything (pool) connected to it. */ + rv = apr_bucket_read(bsender, &data, &dlen, APR_BLOCK_READ); + if (rv != APR_SUCCESS) goto leave; + rv = apr_brigade_write(bb, NULL, NULL, data, dlen); + if (rv != APR_SUCCESS) goto leave; + + remain -= dlen; + ++transferred; + } + + if (brecv) { + /* we have a proxy that we can give the receiver */ + APR_BRIGADE_INSERT_TAIL(bb, brecv); + remain -= brecv->length; + ++transferred; + } + APR_BUCKET_REMOVE(bsender); + H2_BLIST_INSERT_TAIL(&beam->buckets_consumed, bsender); + beam->recv_bytes += bsender->length; + ++consumed_buckets; + } + + if (beam->recv_cb && consumed_buckets > 0) { + beam->recv_cb(beam->recv_ctx, beam); + } + + if (transferred) { + apr_thread_cond_broadcast(beam->change); + rv = APR_SUCCESS; + } + else if (beam->aborted) { + rv = APR_ECONNABORTED; + } + else if (beam->closed) { + rv = APR_EOF; + } + else { + rv = wait_not_empty(beam, to, block); + if (rv != APR_SUCCESS) { + goto leave; } -leave: - leave_yellow(beam, &bl); + goto transfer; + } + +leave: + H2_BEAM_LOG(beam, to, APLOG_TRACE2, rv, "end receive", bb); + if (rv == APR_EAGAIN && beam->eagain_cb) { + beam->eagain_cb(beam->eagain_ctx, beam); } - return status; + apr_thread_mutex_unlock(beam->lock); + return rv; } void h2_beam_on_consumed(h2_bucket_beam *beam, - h2_beam_ev_callback *ev_cb, h2_beam_io_callback *io_cb, void *ctx) { - h2_beam_lock bl; - if (enter_yellow(beam, &bl) == APR_SUCCESS) { - beam->cons_ev_cb = ev_cb; - beam->cons_io_cb = io_cb; - beam->cons_ctx = ctx; - leave_yellow(beam, &bl); - } + apr_thread_mutex_lock(beam->lock); + beam->cons_io_cb = io_cb; + beam->cons_ctx = ctx; + apr_thread_mutex_unlock(beam->lock); } -void h2_beam_on_produced(h2_bucket_beam *beam, - h2_beam_io_callback *io_cb, void *ctx) +void h2_beam_on_received(h2_bucket_beam *beam, + h2_beam_ev_callback *recv_cb, void *ctx) { - h2_beam_lock bl; - if (enter_yellow(beam, &bl) == APR_SUCCESS) { - beam->prod_io_cb = io_cb; - beam->prod_ctx = ctx; - leave_yellow(beam, &bl); - } + apr_thread_mutex_lock(beam->lock); + beam->recv_cb = recv_cb; + beam->recv_ctx = ctx; + apr_thread_mutex_unlock(beam->lock); } -void h2_beam_on_file_beam(h2_bucket_beam *beam, - h2_beam_can_beam_callback *cb, void *ctx) +void h2_beam_on_eagain(h2_bucket_beam *beam, + h2_beam_ev_callback *eagain_cb, void *ctx) { - h2_beam_lock bl; - - if (enter_yellow(beam, &bl) == APR_SUCCESS) { - beam->can_beam_fn = cb; - beam->can_beam_ctx = ctx; - leave_yellow(beam, &bl); - } + apr_thread_mutex_lock(beam->lock); + beam->eagain_cb = eagain_cb; + beam->eagain_ctx = ctx; + apr_thread_mutex_unlock(beam->lock); } +void h2_beam_on_send(h2_bucket_beam *beam, + h2_beam_ev_callback *send_cb, void *ctx) +{ + apr_thread_mutex_lock(beam->lock); + beam->send_cb = send_cb; + beam->send_ctx = ctx; + apr_thread_mutex_unlock(beam->lock); +} -apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam) +void h2_beam_on_was_empty(h2_bucket_beam *beam, + h2_beam_ev_callback *was_empty_cb, void *ctx) { - apr_bucket *b; - apr_off_t l = 0; - h2_beam_lock bl; - - if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) { - for (b = H2_BLIST_FIRST(&beam->send_list); - b != H2_BLIST_SENTINEL(&beam->send_list); - b = APR_BUCKET_NEXT(b)) { - /* should all have determinate length */ - l += b->length; - } - leave_yellow(beam, &bl); - } - return l; + apr_thread_mutex_lock(beam->lock); + beam->was_empty_cb = was_empty_cb; + beam->was_empty_ctx = ctx; + apr_thread_mutex_unlock(beam->lock); } -apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam) + +static apr_off_t get_buffered_data_len(h2_bucket_beam *beam) { apr_bucket *b; apr_off_t l = 0; - h2_beam_lock bl; - - if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) { - for (b = H2_BLIST_FIRST(&beam->send_list); - b != H2_BLIST_SENTINEL(&beam->send_list); - b = APR_BUCKET_NEXT(b)) { - l += bucket_mem_used(b); - } - leave_yellow(beam, &bl); + + for (b = H2_BLIST_FIRST(&beam->buckets_to_send); + b != H2_BLIST_SENTINEL(&beam->buckets_to_send); + b = APR_BUCKET_NEXT(b)) { + /* should all have determinate length */ + l += b->length; } return l; } -int h2_beam_empty(h2_bucket_beam *beam) +apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam) { - int empty = 1; - h2_beam_lock bl; - - if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) { - empty = (H2_BLIST_EMPTY(&beam->send_list) - && (!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer))); - leave_yellow(beam, &bl); - } - return empty; -} + apr_off_t l = 0; -int h2_beam_holds_proxies(h2_bucket_beam *beam) -{ - int has_proxies = 1; - h2_beam_lock bl; - - if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) { - has_proxies = !H2_BPROXY_LIST_EMPTY(&beam->proxies); - leave_yellow(beam, &bl); - } - return has_proxies; + apr_thread_mutex_lock(beam->lock); + l = get_buffered_data_len(beam); + apr_thread_mutex_unlock(beam->lock); + return l; } -int h2_beam_was_received(h2_bucket_beam *beam) +apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam) { - int happend = 0; - h2_beam_lock bl; - - if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) { - happend = (beam->received_bytes > 0); - leave_yellow(beam, &bl); - } - return happend; -} + apr_bucket *b; + apr_off_t l = 0; -apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam) -{ - apr_size_t n = 0; - h2_beam_lock bl; - - if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) { - n = beam->files_beamed; - leave_yellow(beam, &bl); + apr_thread_mutex_lock(beam->lock); + for (b = H2_BLIST_FIRST(&beam->buckets_to_send); + b != H2_BLIST_SENTINEL(&beam->buckets_to_send); + b = APR_BUCKET_NEXT(b)) { + l += bucket_mem_used(b); } - return n; + apr_thread_mutex_unlock(beam->lock); + return l; } -int h2_beam_no_files(void *ctx, h2_bucket_beam *beam, apr_file_t *file) +int h2_beam_empty(h2_bucket_beam *beam) { - return 0; + int empty = 1; + + apr_thread_mutex_lock(beam->lock); + empty = buffer_is_empty(beam); + apr_thread_mutex_unlock(beam->lock); + return empty; } int h2_beam_report_consumption(h2_bucket_beam *beam) { - h2_beam_lock bl; int rv = 0; - if (enter_yellow(beam, &bl) == APR_SUCCESS) { - rv = report_consumption(beam, &bl); - leave_yellow(beam, &bl); - } + + apr_thread_mutex_lock(beam->lock); + rv = report_consumption(beam, 1); + apr_thread_mutex_unlock(beam->lock); return rv; } -void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg) +int h2_beam_is_complete(h2_bucket_beam *beam) { - if (beam && APLOG_C_IS_LEVEL(c,level)) { - ap_log_cerror(APLOG_MARK, level, 0, c, - "beam(%ld-%d,%s,closed=%d,aborted=%d,empty=%d,buf=%ld): %s", - (c->master? c->master->id : c->id), beam->id, beam->tag, - beam->closed, beam->aborted, h2_beam_empty(beam), - (long)h2_beam_get_buffered(beam), msg); + int rv = 0; + + apr_thread_mutex_lock(beam->lock); + if (beam->closed) + rv = 1; + else { + apr_bucket *b; + for (b = H2_BLIST_FIRST(&beam->buckets_to_send); + b != H2_BLIST_SENTINEL(&beam->buckets_to_send); + b = APR_BUCKET_NEXT(b)) { + if (APR_BUCKET_IS_EOS(b)) { + rv = 1; + break; + } + } } + apr_thread_mutex_unlock(beam->lock); + return rv; } - - |