summaryrefslogtreecommitdiffstats
path: root/modules/http2/h2_bucket_beam.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--modules/http2/h2_bucket_beam.c825
1 files changed, 825 insertions, 0 deletions
diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c
new file mode 100644
index 0000000..cbf7f34
--- /dev/null
+++ b/modules/http2/h2_bucket_beam.c
@@ -0,0 +1,825 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <apr_lib.h>
+#include <apr_atomic.h>
+#include <apr_strings.h>
+#include <apr_time.h>
+#include <apr_buckets.h>
+#include <apr_thread_mutex.h>
+#include <apr_thread_cond.h>
+
+#include <httpd.h>
+#include <http_protocol.h>
+#include <http_log.h>
+
+#include "h2_private.h"
+#include "h2_conn_ctx.h"
+#include "h2_headers.h"
+#include "h2_util.h"
+#include "h2_bucket_beam.h"
+
+
+#define H2_BLIST_INIT(b) APR_RING_INIT(&(b)->list, apr_bucket, link);
+#define H2_BLIST_SENTINEL(b) APR_RING_SENTINEL(&(b)->list, apr_bucket, link)
+#define H2_BLIST_EMPTY(b) APR_RING_EMPTY(&(b)->list, apr_bucket, link)
+#define H2_BLIST_FIRST(b) APR_RING_FIRST(&(b)->list)
+#define H2_BLIST_LAST(b) APR_RING_LAST(&(b)->list)
+#define H2_BLIST_INSERT_HEAD(b, e) do { \
+ apr_bucket *ap__b = (e); \
+ APR_RING_INSERT_HEAD(&(b)->list, ap__b, apr_bucket, link); \
+ } while (0)
+#define H2_BLIST_INSERT_TAIL(b, e) do { \
+ apr_bucket *ap__b = (e); \
+ APR_RING_INSERT_TAIL(&(b)->list, ap__b, apr_bucket, link); \
+ } while (0)
+#define H2_BLIST_CONCAT(a, b) do { \
+ APR_RING_CONCAT(&(a)->list, &(b)->list, apr_bucket, link); \
+ } while (0)
+#define H2_BLIST_PREPEND(a, b) do { \
+ APR_RING_PREPEND(&(a)->list, &(b)->list, apr_bucket, link); \
+ } while (0)
+
+
+static int buffer_is_empty(h2_bucket_beam *beam);
+static apr_off_t get_buffered_data_len(h2_bucket_beam *beam);
+
+static int h2_blist_count(h2_blist *blist)
+{
+ apr_bucket *b;
+ int count = 0;
+
+ for (b = H2_BLIST_FIRST(blist); b != H2_BLIST_SENTINEL(blist);
+ b = APR_BUCKET_NEXT(b)) {
+ ++count;
+ }
+ return count;
+}
+
+#define H2_BEAM_LOG(beam, c, level, rv, msg, bb) \
+ do { \
+ if (APLOG_C_IS_LEVEL((c),(level))) { \
+ char buffer[4 * 1024]; \
+ apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); \
+ len = bb? h2_util_bb_print(buffer, bmax, "", "", bb) : 0; \
+ ap_log_cerror(APLOG_MARK, (level), rv, (c), \
+ "BEAM[%s,%s%sdata=%ld,buckets(send/consumed)=%d/%d]: %s %s", \
+ (beam)->name, \
+ (beam)->aborted? "aborted," : "", \
+ buffer_is_empty(beam)? "empty," : "", \
+ (long)get_buffered_data_len(beam), \
+ h2_blist_count(&(beam)->buckets_to_send), \
+ h2_blist_count(&(beam)->buckets_consumed), \
+ (msg), len? buffer : ""); \
+ } \
+ } while (0)
+
+
+static int bucket_is_mmap(apr_bucket *b)
+{
+#if APR_HAS_MMAP
+ return APR_BUCKET_IS_MMAP(b);
+#else
+ /* if it is not defined as enabled, it should always be no */
+ return 0;
+#endif
+}
+
+static apr_off_t bucket_mem_used(apr_bucket *b)
+{
+ if (APR_BUCKET_IS_FILE(b) || bucket_is_mmap(b)) {
+ return 0;
+ }
+ else {
+ /* should all have determinate length */
+ return (apr_off_t)b->length;
+ }
+}
+
+static int report_consumption(h2_bucket_beam *beam, int locked)
+{
+ int rv = 0;
+ apr_off_t len = beam->recv_bytes - beam->recv_bytes_reported;
+ h2_beam_io_callback *cb = beam->cons_io_cb;
+
+ if (len > 0) {
+ if (cb) {
+ void *ctx = beam->cons_ctx;
+
+ if (locked) apr_thread_mutex_unlock(beam->lock);
+ cb(ctx, beam, len);
+ if (locked) apr_thread_mutex_lock(beam->lock);
+ rv = 1;
+ }
+ beam->recv_bytes_reported += len;
+ }
+ return rv;
+}
+
+static apr_size_t calc_buffered(h2_bucket_beam *beam)
+{
+ apr_size_t len = 0;
+ apr_bucket *b;
+ for (b = H2_BLIST_FIRST(&beam->buckets_to_send);
+ b != H2_BLIST_SENTINEL(&beam->buckets_to_send);
+ b = APR_BUCKET_NEXT(b)) {
+ if (b->length == ((apr_size_t)-1)) {
+ /* do not count */
+ }
+ else if (APR_BUCKET_IS_FILE(b) || bucket_is_mmap(b)) {
+ /* if unread, has no real mem footprint. */
+ }
+ else {
+ len += b->length;
+ }
+ }
+ return len;
+}
+
+static void purge_consumed_buckets(h2_bucket_beam *beam)
+{
+ apr_bucket *b;
+ /* delete all sender buckets in purge brigade, needs to be called
+ * from sender thread only */
+ while (!H2_BLIST_EMPTY(&beam->buckets_consumed)) {
+ b = H2_BLIST_FIRST(&beam->buckets_consumed);
+ apr_bucket_delete(b);
+ }
+}
+
+static apr_size_t calc_space_left(h2_bucket_beam *beam)
+{
+ if (beam->max_buf_size > 0) {
+ apr_size_t len = calc_buffered(beam);
+ return (beam->max_buf_size > len? (beam->max_buf_size - len) : 0);
+ }
+ return APR_SIZE_MAX;
+}
+
+static int buffer_is_empty(h2_bucket_beam *beam)
+{
+ return H2_BLIST_EMPTY(&beam->buckets_to_send);
+}
+
+static apr_status_t wait_not_empty(h2_bucket_beam *beam, conn_rec *c, apr_read_type_e block)
+{
+ apr_status_t rv = APR_SUCCESS;
+
+ while (buffer_is_empty(beam) && APR_SUCCESS == rv) {
+ if (beam->aborted) {
+ rv = APR_ECONNABORTED;
+ }
+ else if (beam->closed) {
+ rv = APR_EOF;
+ }
+ else if (APR_BLOCK_READ != block) {
+ rv = APR_EAGAIN;
+ }
+ else if (beam->timeout > 0) {
+ H2_BEAM_LOG(beam, c, APLOG_TRACE2, rv, "wait_not_empty, timeout", NULL);
+ rv = apr_thread_cond_timedwait(beam->change, beam->lock, beam->timeout);
+ }
+ else {
+ H2_BEAM_LOG(beam, c, APLOG_TRACE2, rv, "wait_not_empty, forever", NULL);
+ rv = apr_thread_cond_wait(beam->change, beam->lock);
+ }
+ }
+ return rv;
+}
+
+static apr_status_t wait_not_full(h2_bucket_beam *beam, conn_rec *c,
+ apr_read_type_e block,
+ apr_size_t *pspace_left)
+{
+ apr_status_t rv = APR_SUCCESS;
+ apr_size_t left;
+
+ while (0 == (left = calc_space_left(beam)) && APR_SUCCESS == rv) {
+ if (beam->aborted) {
+ rv = APR_ECONNABORTED;
+ }
+ else if (block != APR_BLOCK_READ) {
+ rv = APR_EAGAIN;
+ }
+ else {
+ if (beam->timeout > 0) {
+ H2_BEAM_LOG(beam, c, APLOG_TRACE2, rv, "wait_not_full, timeout", NULL);
+ rv = apr_thread_cond_timedwait(beam->change, beam->lock, beam->timeout);
+ }
+ else {
+ H2_BEAM_LOG(beam, c, APLOG_TRACE2, rv, "wait_not_full, forever", NULL);
+ rv = apr_thread_cond_wait(beam->change, beam->lock);
+ }
+ }
+ }
+ *pspace_left = left;
+ return rv;
+}
+
+static void h2_blist_cleanup(h2_blist *bl)
+{
+ apr_bucket *e;
+
+ while (!H2_BLIST_EMPTY(bl)) {
+ e = H2_BLIST_FIRST(bl);
+ apr_bucket_delete(e);
+ }
+}
+
+static void beam_shutdown(h2_bucket_beam *beam, apr_shutdown_how_e how)
+{
+ if (!beam->pool) {
+ /* pool being cleared already */
+ return;
+ }
+
+ /* shutdown both receiver and sender? */
+ if (how == APR_SHUTDOWN_READWRITE) {
+ beam->cons_io_cb = NULL;
+ beam->recv_cb = NULL;
+ }
+
+ /* shutdown sender (or both)? */
+ if (how != APR_SHUTDOWN_READ) {
+ h2_blist_cleanup(&beam->buckets_to_send);
+ purge_consumed_buckets(beam);
+ }
+}
+
+static apr_status_t beam_cleanup(void *data)
+{
+ h2_bucket_beam *beam = data;
+ beam_shutdown(beam, APR_SHUTDOWN_READWRITE);
+ beam->pool = NULL; /* the pool is clearing now */
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_beam_destroy(h2_bucket_beam *beam, conn_rec *c)
+{
+ if (beam->pool) {
+ H2_BEAM_LOG(beam, c, APLOG_TRACE2, 0, "destroy", NULL);
+ apr_pool_cleanup_run(beam->pool, beam, beam_cleanup);
+ }
+ H2_BEAM_LOG(beam, c, APLOG_TRACE2, 0, "destroyed", NULL);
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_beam_create(h2_bucket_beam **pbeam, conn_rec *from,
+ apr_pool_t *pool, int id, const char *tag,
+ apr_size_t max_buf_size,
+ apr_interval_time_t timeout)
+{
+ h2_bucket_beam *beam;
+ h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(from);
+ apr_status_t rv;
+
+ beam = apr_pcalloc(pool, sizeof(*beam));
+ beam->pool = pool;
+ beam->from = from;
+ beam->id = id;
+ beam->name = apr_psprintf(pool, "%s-%d-%s",
+ conn_ctx->id, id, tag);
+
+ H2_BLIST_INIT(&beam->buckets_to_send);
+ H2_BLIST_INIT(&beam->buckets_consumed);
+ beam->tx_mem_limits = 1;
+ beam->max_buf_size = max_buf_size;
+ beam->timeout = timeout;
+
+ rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, pool);
+ if (APR_SUCCESS != rv) goto cleanup;
+ rv = apr_thread_cond_create(&beam->change, pool);
+ if (APR_SUCCESS != rv) goto cleanup;
+ apr_pool_pre_cleanup_register(pool, beam, beam_cleanup);
+
+cleanup:
+ H2_BEAM_LOG(beam, from, APLOG_TRACE2, rv, "created", NULL);
+ *pbeam = (APR_SUCCESS == rv)? beam : NULL;
+ return rv;
+}
+
+void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size)
+{
+ apr_thread_mutex_lock(beam->lock);
+ beam->max_buf_size = buffer_size;
+ apr_thread_mutex_unlock(beam->lock);
+}
+
+void h2_beam_set_copy_files(h2_bucket_beam * beam, int enabled)
+{
+ apr_thread_mutex_lock(beam->lock);
+ beam->copy_files = enabled;
+ apr_thread_mutex_unlock(beam->lock);
+}
+
+apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam)
+{
+ apr_size_t buffer_size = 0;
+
+ apr_thread_mutex_lock(beam->lock);
+ buffer_size = beam->max_buf_size;
+ apr_thread_mutex_unlock(beam->lock);
+ return buffer_size;
+}
+
+apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam)
+{
+ apr_interval_time_t timeout;
+
+ apr_thread_mutex_lock(beam->lock);
+ timeout = beam->timeout;
+ apr_thread_mutex_unlock(beam->lock);
+ return timeout;
+}
+
+void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout)
+{
+ apr_thread_mutex_lock(beam->lock);
+ beam->timeout = timeout;
+ apr_thread_mutex_unlock(beam->lock);
+}
+
+void h2_beam_abort(h2_bucket_beam *beam, conn_rec *c)
+{
+ apr_thread_mutex_lock(beam->lock);
+ beam->aborted = 1;
+ if (c == beam->from) {
+ /* sender aborts */
+ if (beam->send_cb) {
+ beam->send_cb(beam->send_ctx, beam);
+ }
+ if (beam->was_empty_cb && buffer_is_empty(beam)) {
+ beam->was_empty_cb(beam->was_empty_ctx, beam);
+ }
+ /* no more consumption reporting to sender */
+ report_consumption(beam, 1);
+ beam->cons_ctx = NULL;
+
+ beam_shutdown(beam, APR_SHUTDOWN_WRITE);
+ }
+ else {
+ /* receiver aborts */
+ beam_shutdown(beam, APR_SHUTDOWN_READ);
+ }
+ apr_thread_cond_broadcast(beam->change);
+ apr_thread_mutex_unlock(beam->lock);
+}
+
+void h2_beam_close(h2_bucket_beam *beam, conn_rec *c)
+{
+ apr_thread_mutex_lock(beam->lock);
+ if (!beam->closed) {
+ /* should only be called from sender */
+ ap_assert(c == beam->from);
+ beam->closed = 1;
+ if (beam->send_cb) {
+ beam->send_cb(beam->send_ctx, beam);
+ }
+ if (beam->was_empty_cb && buffer_is_empty(beam)) {
+ beam->was_empty_cb(beam->was_empty_ctx, beam);
+ }
+ apr_thread_cond_broadcast(beam->change);
+ }
+ apr_thread_mutex_unlock(beam->lock);
+}
+
+static apr_status_t append_bucket(h2_bucket_beam *beam,
+ apr_bucket_brigade *bb,
+ apr_read_type_e block,
+ apr_size_t *pspace_left,
+ apr_off_t *pwritten)
+{
+ apr_bucket *b;
+ const char *data;
+ apr_size_t len;
+ apr_status_t rv = APR_SUCCESS;
+ int can_beam = 0;
+
+ (void)block;
+ if (beam->aborted) {
+ rv = APR_ECONNABORTED;
+ goto cleanup;
+ }
+
+ ap_assert(beam->pool);
+
+ b = APR_BRIGADE_FIRST(bb);
+ if (APR_BUCKET_IS_METADATA(b)) {
+ APR_BUCKET_REMOVE(b);
+ apr_bucket_setaside(b, beam->pool);
+ H2_BLIST_INSERT_TAIL(&beam->buckets_to_send, b);
+ goto cleanup;
+ }
+ /* non meta bucket */
+
+ /* in case of indeterminate length, we need to read the bucket,
+ * so that it transforms itself into something stable. */
+ if (b->length == ((apr_size_t)-1)) {
+ rv = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
+ if (rv != APR_SUCCESS) goto cleanup;
+ }
+
+ if (APR_BUCKET_IS_FILE(b)) {
+ /* For file buckets the problem is their internal readpool that
+ * is used on the first read to allocate buffer/mmap.
+ * Since setting aside a file bucket will de-register the
+ * file cleanup function from the previous pool, we need to
+ * call that only from the sender thread.
+ *
+ * Currently, we do not handle file bucket with refcount > 1 as
+ * the beam is then not in complete control of the file's lifetime.
+ * Which results in the bug that a file get closed by the receiver
+ * while the sender or the beam still have buckets using it.
+ *
+ * Additionally, we allow callbacks to prevent beaming file
+ * handles across. The use case for this is to limit the number
+ * of open file handles and rather use a less efficient beam
+ * transport. */
+ apr_bucket_file *bf = b->data;
+ can_beam = !beam->copy_files && (bf->refcount.refcount == 1);
+ }
+ else if (bucket_is_mmap(b)) {
+ can_beam = !beam->copy_files;
+ }
+
+ if (b->length == 0) {
+ apr_bucket_delete(b);
+ rv = APR_SUCCESS;
+ goto cleanup;
+ }
+
+ if (!*pspace_left) {
+ rv = APR_EAGAIN;
+ goto cleanup;
+ }
+
+ /* bucket is accepted and added to beam->buckets_to_send */
+ if (APR_BUCKET_IS_HEAP(b)) {
+ /* For heap buckets, a read from a receiver thread is fine. The
+ * data will be there and live until the bucket itself is
+ * destroyed. */
+ rv = apr_bucket_setaside(b, beam->pool);
+ if (rv != APR_SUCCESS) goto cleanup;
+ }
+ else if (can_beam && (APR_BUCKET_IS_FILE(b) || bucket_is_mmap(b))) {
+ rv = apr_bucket_setaside(b, beam->pool);
+ if (rv != APR_SUCCESS) goto cleanup;
+ }
+ else {
+ /* we know of no special shortcut to transfer the bucket to
+ * another pool without copying. So we make it a heap bucket. */
+ apr_bucket *b2;
+
+ rv = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
+ if (rv != APR_SUCCESS) goto cleanup;
+ /* this allocates and copies data */
+ b2 = apr_bucket_heap_create(data, len, NULL, bb->bucket_alloc);
+ apr_bucket_delete(b);
+ b = b2;
+ APR_BRIGADE_INSERT_HEAD(bb, b);
+ }
+
+ APR_BUCKET_REMOVE(b);
+ H2_BLIST_INSERT_TAIL(&beam->buckets_to_send, b);
+ *pwritten += (apr_off_t)b->length;
+ if (b->length > *pspace_left) {
+ *pspace_left = 0;
+ }
+ else {
+ *pspace_left -= b->length;
+ }
+
+cleanup:
+ return rv;
+}
+
+apr_status_t h2_beam_send(h2_bucket_beam *beam, conn_rec *from,
+ apr_bucket_brigade *sender_bb,
+ apr_read_type_e block,
+ apr_off_t *pwritten)
+{
+ apr_status_t rv = APR_SUCCESS;
+ apr_size_t space_left = 0;
+ int was_empty;
+
+ ap_assert(beam->pool);
+
+ /* Called from the sender thread to add buckets to the beam */
+ apr_thread_mutex_lock(beam->lock);
+ ap_assert(beam->from == from);
+ ap_assert(sender_bb);
+ H2_BEAM_LOG(beam, from, APLOG_TRACE2, rv, "start send", sender_bb);
+ purge_consumed_buckets(beam);
+ *pwritten = 0;
+ was_empty = buffer_is_empty(beam);
+
+ space_left = calc_space_left(beam);
+ while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) {
+ rv = append_bucket(beam, sender_bb, block, &space_left, pwritten);
+ if (beam->aborted) {
+ goto cleanup;
+ }
+ else if (APR_EAGAIN == rv) {
+ /* bucket was not added, as beam buffer has no space left.
+ * Trigger event callbacks, so receiver can know there is something
+ * to receive before we do a conditional wait. */
+ purge_consumed_buckets(beam);
+ if (beam->send_cb) {
+ beam->send_cb(beam->send_ctx, beam);
+ }
+ if (was_empty && beam->was_empty_cb) {
+ beam->was_empty_cb(beam->was_empty_ctx, beam);
+ }
+ rv = wait_not_full(beam, from, block, &space_left);
+ if (APR_SUCCESS != rv) {
+ break;
+ }
+ was_empty = buffer_is_empty(beam);
+ }
+ }
+
+cleanup:
+ if (beam->send_cb && !buffer_is_empty(beam)) {
+ beam->send_cb(beam->send_ctx, beam);
+ }
+ if (was_empty && beam->was_empty_cb && !buffer_is_empty(beam)) {
+ beam->was_empty_cb(beam->was_empty_ctx, beam);
+ }
+ apr_thread_cond_broadcast(beam->change);
+
+ report_consumption(beam, 1);
+ if (beam->aborted) {
+ rv = APR_ECONNABORTED;
+ }
+ H2_BEAM_LOG(beam, from, APLOG_TRACE2, rv, "end send", sender_bb);
+ apr_thread_mutex_unlock(beam->lock);
+ return rv;
+}
+
+apr_status_t h2_beam_receive(h2_bucket_beam *beam,
+ conn_rec *to,
+ apr_bucket_brigade *bb,
+ apr_read_type_e block,
+ apr_off_t readbytes)
+{
+ apr_bucket *bsender, *brecv, *ng;
+ int transferred = 0;
+ apr_status_t rv = APR_SUCCESS;
+ apr_off_t remain;
+ int consumed_buckets = 0;
+
+ apr_thread_mutex_lock(beam->lock);
+ H2_BEAM_LOG(beam, to, APLOG_TRACE2, 0, "start receive", bb);
+ if (readbytes <= 0) {
+ readbytes = (apr_off_t)APR_SIZE_MAX;
+ }
+ remain = readbytes;
+
+transfer:
+ if (beam->aborted) {
+ beam_shutdown(beam, APR_SHUTDOWN_READ);
+ rv = APR_ECONNABORTED;
+ goto leave;
+ }
+
+ ap_assert(beam->pool);
+
+ /* transfer from our sender brigade, transforming sender buckets to
+ * receiver ones until we have enough */
+ while (remain >= 0 && !H2_BLIST_EMPTY(&beam->buckets_to_send)) {
+
+ brecv = NULL;
+ bsender = H2_BLIST_FIRST(&beam->buckets_to_send);
+ if (bsender->length > 0 && remain <= 0) {
+ break;
+ }
+
+ if (APR_BUCKET_IS_METADATA(bsender)) {
+ /* we need a real copy into the receivers bucket_alloc */
+ if (APR_BUCKET_IS_EOS(bsender)) {
+ /* this closes the beam */
+ beam->closed = 1;
+ brecv = apr_bucket_eos_create(bb->bucket_alloc);
+ }
+ else if (APR_BUCKET_IS_FLUSH(bsender)) {
+ brecv = apr_bucket_flush_create(bb->bucket_alloc);
+ }
+#if AP_HAS_RESPONSE_BUCKETS
+ else if (AP_BUCKET_IS_RESPONSE(bsender)) {
+ brecv = ap_bucket_response_clone(bsender, bb->p, bb->bucket_alloc);
+ }
+ else if (AP_BUCKET_IS_REQUEST(bsender)) {
+ brecv = ap_bucket_request_clone(bsender, bb->p, bb->bucket_alloc);
+ }
+ else if (AP_BUCKET_IS_HEADERS(bsender)) {
+ brecv = ap_bucket_headers_clone(bsender, bb->p, bb->bucket_alloc);
+ }
+#else
+ else if (H2_BUCKET_IS_HEADERS(bsender)) {
+ brecv = h2_bucket_headers_clone(bsender, bb->p, bb->bucket_alloc);
+ }
+#endif /* AP_HAS_RESPONSE_BUCKETS */
+ else if (AP_BUCKET_IS_ERROR(bsender)) {
+ ap_bucket_error *eb = bsender->data;
+ brecv = ap_bucket_error_create(eb->status, eb->data,
+ bb->p, bb->bucket_alloc);
+ }
+ }
+ else if (bsender->length == 0) {
+ /* nop */
+ }
+#if APR_HAS_MMAP
+ else if (APR_BUCKET_IS_MMAP(bsender)) {
+ apr_bucket_mmap *bmmap = bsender->data;
+ apr_mmap_t *mmap;
+ rv = apr_mmap_dup(&mmap, bmmap->mmap, bb->p);
+ if (rv != APR_SUCCESS) goto leave;
+ brecv = apr_bucket_mmap_create(mmap, bsender->start, bsender->length, bb->bucket_alloc);
+ }
+#endif
+ else if (APR_BUCKET_IS_FILE(bsender)) {
+ /* This is setaside into the target brigade pool so that
+ * any read operation messes with that pool and not
+ * the sender one. */
+ apr_bucket_file *f = (apr_bucket_file *)bsender->data;
+ apr_file_t *fd = f->fd;
+ int setaside = (f->readpool != bb->p);
+
+ if (setaside) {
+ rv = apr_file_setaside(&fd, fd, bb->p);
+ if (rv != APR_SUCCESS) goto leave;
+ }
+ ng = apr_brigade_insert_file(bb, fd, bsender->start, (apr_off_t)bsender->length,
+ bb->p);
+#if APR_HAS_MMAP
+ /* disable mmap handling as this leads to segfaults when
+ * the underlying file is changed while memory pointer has
+ * been handed out. See also PR 59348 */
+ apr_bucket_file_enable_mmap(ng, 0);
+#endif
+ remain -= bsender->length;
+ ++transferred;
+ }
+ else {
+ const char *data;
+ apr_size_t dlen;
+ /* we did that when the bucket was added, so this should
+ * give us the same data as before without changing the bucket
+ * or anything (pool) connected to it. */
+ rv = apr_bucket_read(bsender, &data, &dlen, APR_BLOCK_READ);
+ if (rv != APR_SUCCESS) goto leave;
+ rv = apr_brigade_write(bb, NULL, NULL, data, dlen);
+ if (rv != APR_SUCCESS) goto leave;
+
+ remain -= dlen;
+ ++transferred;
+ }
+
+ if (brecv) {
+ /* we have a proxy that we can give the receiver */
+ APR_BRIGADE_INSERT_TAIL(bb, brecv);
+ remain -= brecv->length;
+ ++transferred;
+ }
+ APR_BUCKET_REMOVE(bsender);
+ H2_BLIST_INSERT_TAIL(&beam->buckets_consumed, bsender);
+ beam->recv_bytes += bsender->length;
+ ++consumed_buckets;
+ }
+
+ if (beam->recv_cb && consumed_buckets > 0) {
+ beam->recv_cb(beam->recv_ctx, beam);
+ }
+
+ if (transferred) {
+ apr_thread_cond_broadcast(beam->change);
+ rv = APR_SUCCESS;
+ }
+ else if (beam->aborted) {
+ rv = APR_ECONNABORTED;
+ }
+ else if (beam->closed) {
+ rv = APR_EOF;
+ }
+ else {
+ rv = wait_not_empty(beam, to, block);
+ if (rv != APR_SUCCESS) {
+ goto leave;
+ }
+ goto transfer;
+ }
+
+leave:
+ H2_BEAM_LOG(beam, to, APLOG_TRACE2, rv, "end receive", bb);
+ apr_thread_mutex_unlock(beam->lock);
+ return rv;
+}
+
+void h2_beam_on_consumed(h2_bucket_beam *beam,
+ h2_beam_io_callback *io_cb, void *ctx)
+{
+ apr_thread_mutex_lock(beam->lock);
+ beam->cons_io_cb = io_cb;
+ beam->cons_ctx = ctx;
+ apr_thread_mutex_unlock(beam->lock);
+}
+
+void h2_beam_on_received(h2_bucket_beam *beam,
+ h2_beam_ev_callback *recv_cb, void *ctx)
+{
+ apr_thread_mutex_lock(beam->lock);
+ beam->recv_cb = recv_cb;
+ beam->recv_ctx = ctx;
+ apr_thread_mutex_unlock(beam->lock);
+}
+
+void h2_beam_on_send(h2_bucket_beam *beam,
+ h2_beam_ev_callback *send_cb, void *ctx)
+{
+ apr_thread_mutex_lock(beam->lock);
+ beam->send_cb = send_cb;
+ beam->send_ctx = ctx;
+ apr_thread_mutex_unlock(beam->lock);
+}
+
+void h2_beam_on_was_empty(h2_bucket_beam *beam,
+ h2_beam_ev_callback *was_empty_cb, void *ctx)
+{
+ apr_thread_mutex_lock(beam->lock);
+ beam->was_empty_cb = was_empty_cb;
+ beam->was_empty_ctx = ctx;
+ apr_thread_mutex_unlock(beam->lock);
+}
+
+
+static apr_off_t get_buffered_data_len(h2_bucket_beam *beam)
+{
+ apr_bucket *b;
+ apr_off_t l = 0;
+
+ for (b = H2_BLIST_FIRST(&beam->buckets_to_send);
+ b != H2_BLIST_SENTINEL(&beam->buckets_to_send);
+ b = APR_BUCKET_NEXT(b)) {
+ /* should all have determinate length */
+ l += b->length;
+ }
+ return l;
+}
+
+apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam)
+{
+ apr_off_t l = 0;
+
+ apr_thread_mutex_lock(beam->lock);
+ l = get_buffered_data_len(beam);
+ apr_thread_mutex_unlock(beam->lock);
+ return l;
+}
+
+apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam)
+{
+ apr_bucket *b;
+ apr_off_t l = 0;
+
+ apr_thread_mutex_lock(beam->lock);
+ for (b = H2_BLIST_FIRST(&beam->buckets_to_send);
+ b != H2_BLIST_SENTINEL(&beam->buckets_to_send);
+ b = APR_BUCKET_NEXT(b)) {
+ l += bucket_mem_used(b);
+ }
+ apr_thread_mutex_unlock(beam->lock);
+ return l;
+}
+
+int h2_beam_empty(h2_bucket_beam *beam)
+{
+ int empty = 1;
+
+ apr_thread_mutex_lock(beam->lock);
+ empty = buffer_is_empty(beam);
+ apr_thread_mutex_unlock(beam->lock);
+ return empty;
+}
+
+int h2_beam_report_consumption(h2_bucket_beam *beam)
+{
+ int rv = 0;
+
+ apr_thread_mutex_lock(beam->lock);
+ rv = report_consumption(beam, 1);
+ apr_thread_mutex_unlock(beam->lock);
+ return rv;
+}