/* Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
 
#include <apr_lib.h>
#include <apr_atomic.h>
#include <apr_strings.h>
#include <apr_time.h>
#include <apr_buckets.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>

#include <httpd.h>
#include <http_protocol.h>
#include <http_log.h>

#include "h2_private.h"
#include "h2_conn_ctx.h"
#include "h2_headers.h"
#include "h2_util.h"
#include "h2_bucket_beam.h"


#define H2_BLIST_INIT(b)        APR_RING_INIT(&(b)->list, apr_bucket, link);
#define H2_BLIST_SENTINEL(b)    APR_RING_SENTINEL(&(b)->list, apr_bucket, link)
#define H2_BLIST_EMPTY(b)       APR_RING_EMPTY(&(b)->list, apr_bucket, link)
#define H2_BLIST_FIRST(b)       APR_RING_FIRST(&(b)->list)
#define H2_BLIST_LAST(b)	APR_RING_LAST(&(b)->list)
#define H2_BLIST_INSERT_HEAD(b, e) do {				\
	apr_bucket *ap__b = (e);                                        \
	APR_RING_INSERT_HEAD(&(b)->list, ap__b, apr_bucket, link);	\
    } while (0)
#define H2_BLIST_INSERT_TAIL(b, e) do {				\
	apr_bucket *ap__b = (e);					\
	APR_RING_INSERT_TAIL(&(b)->list, ap__b, apr_bucket, link);	\
    } while (0)
#define H2_BLIST_CONCAT(a, b) do {					\
        APR_RING_CONCAT(&(a)->list, &(b)->list, apr_bucket, link);	\
    } while (0)
#define H2_BLIST_PREPEND(a, b) do {					\
        APR_RING_PREPEND(&(a)->list, &(b)->list, apr_bucket, link);	\
    } while (0)


static int buffer_is_empty(h2_bucket_beam *beam);
static apr_off_t get_buffered_data_len(h2_bucket_beam *beam);

static int h2_blist_count(h2_blist *blist)
{
    apr_bucket *b;
    int count = 0;

    for (b = H2_BLIST_FIRST(blist); b != H2_BLIST_SENTINEL(blist);
         b = APR_BUCKET_NEXT(b)) {
        ++count;
    }
    return count;
}

#define H2_BEAM_LOG(beam, c, level, rv, msg, bb) \
    do { \
        if (APLOG_C_IS_LEVEL((c),(level))) { \
            char buffer[4 * 1024]; \
            apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); \
            len = bb? h2_util_bb_print(buffer, bmax, "", "", bb) : 0; \
            ap_log_cerror(APLOG_MARK, (level), rv, (c), \
                          "BEAM[%s,%s%sdata=%ld,buckets(send/consumed)=%d/%d]: %s %s", \
                          (beam)->name, \
                          (beam)->aborted? "aborted," : "", \
                          buffer_is_empty(beam)? "empty," : "", \
                          (long)get_buffered_data_len(beam), \
                          h2_blist_count(&(beam)->buckets_to_send), \
                          h2_blist_count(&(beam)->buckets_consumed), \
                          (msg), len? buffer : ""); \
        } \
    } while (0)


static int bucket_is_mmap(apr_bucket *b)
{
#if APR_HAS_MMAP
    return APR_BUCKET_IS_MMAP(b);
#else
    /* if it is not defined as enabled, it should always be no */
    return 0;
#endif
}

static apr_off_t bucket_mem_used(apr_bucket *b)
{
    if (APR_BUCKET_IS_FILE(b) || bucket_is_mmap(b)) {
        return 0;
    }
    else {
        /* should all have determinate length */
        return (apr_off_t)b->length;
    }
}

static int report_consumption(h2_bucket_beam *beam, int locked)
{
    int rv = 0;
    apr_off_t len = beam->recv_bytes - beam->recv_bytes_reported;
    h2_beam_io_callback *cb = beam->cons_io_cb;
     
    if (len > 0) {
        if (cb) {
            void *ctx = beam->cons_ctx;
            
            if (locked) apr_thread_mutex_unlock(beam->lock);
            cb(ctx, beam, len);
            if (locked) apr_thread_mutex_lock(beam->lock);
            rv = 1;
        }
        beam->recv_bytes_reported += len;
    }
    return rv;
}

static apr_size_t calc_buffered(h2_bucket_beam *beam)
{
    apr_size_t len = 0;
    apr_bucket *b;
    for (b = H2_BLIST_FIRST(&beam->buckets_to_send);
         b != H2_BLIST_SENTINEL(&beam->buckets_to_send);
         b = APR_BUCKET_NEXT(b)) {
        if (b->length == ((apr_size_t)-1)) {
            /* do not count */
        }
        else if (APR_BUCKET_IS_FILE(b) || bucket_is_mmap(b)) {
            /* if unread, has no real mem footprint. */
        }
        else {
            len += b->length;
        }
    }
    return len;
}

static void purge_consumed_buckets(h2_bucket_beam *beam)
{
    apr_bucket *b;
    /* delete all sender buckets in purge brigade, needs to be called
     * from sender thread only */
    while (!H2_BLIST_EMPTY(&beam->buckets_consumed)) {
        b = H2_BLIST_FIRST(&beam->buckets_consumed);
        apr_bucket_delete(b);
    }
}

static apr_size_t calc_space_left(h2_bucket_beam *beam)
{
    if (beam->max_buf_size > 0) {
        apr_size_t len = calc_buffered(beam);
        return (beam->max_buf_size > len? (beam->max_buf_size - len) : 0);
    }
    return APR_SIZE_MAX;
}

static int buffer_is_empty(h2_bucket_beam *beam)
{
    return H2_BLIST_EMPTY(&beam->buckets_to_send);
}

static apr_status_t wait_not_empty(h2_bucket_beam *beam, conn_rec *c, apr_read_type_e block)
{
    apr_status_t rv = APR_SUCCESS;
    
    while (buffer_is_empty(beam) && APR_SUCCESS == rv) {
        if (beam->aborted) {
            rv = APR_ECONNABORTED;
        }
        else if (beam->closed) {
            rv = APR_EOF;
        }
        else if (APR_BLOCK_READ != block) {
            rv = APR_EAGAIN;
        }
        else if (beam->timeout > 0) {
            H2_BEAM_LOG(beam, c, APLOG_TRACE2, rv, "wait_not_empty, timeout", NULL);
            rv = apr_thread_cond_timedwait(beam->change, beam->lock, beam->timeout);
        }
        else {
            H2_BEAM_LOG(beam, c, APLOG_TRACE2, rv, "wait_not_empty, forever", NULL);
            rv = apr_thread_cond_wait(beam->change, beam->lock);
        }
    }
    return rv;
}

static apr_status_t wait_not_full(h2_bucket_beam *beam, conn_rec *c,
                                  apr_read_type_e block,
                                  apr_size_t *pspace_left)
{
    apr_status_t rv = APR_SUCCESS;
    apr_size_t left;
    
    while (0 == (left = calc_space_left(beam)) && APR_SUCCESS == rv) {
        if (beam->aborted) {
            rv = APR_ECONNABORTED;
        }
        else if (block != APR_BLOCK_READ) {
            rv = APR_EAGAIN;
        }
        else {
            if (beam->timeout > 0) {
                H2_BEAM_LOG(beam, c, APLOG_TRACE2, rv, "wait_not_full, timeout", NULL);
                rv = apr_thread_cond_timedwait(beam->change, beam->lock, beam->timeout);
            }
            else {
                H2_BEAM_LOG(beam, c, APLOG_TRACE2, rv, "wait_not_full, forever", NULL);
                rv = apr_thread_cond_wait(beam->change, beam->lock);
            }
        }
    }
    *pspace_left = left;
    return rv;
}

static void h2_blist_cleanup(h2_blist *bl)
{
    apr_bucket *e;

    while (!H2_BLIST_EMPTY(bl)) {
        e = H2_BLIST_FIRST(bl);
        apr_bucket_delete(e);
    }
}

static void beam_shutdown(h2_bucket_beam *beam, apr_shutdown_how_e how)
{
    if (!beam->pool) {
        /* pool being cleared already */
        return;
    }

    /* shutdown both receiver and sender? */
    if (how == APR_SHUTDOWN_READWRITE) {
        beam->cons_io_cb = NULL;
        beam->recv_cb = NULL;
    }

    /* shutdown sender (or both)? */
    if (how != APR_SHUTDOWN_READ) {
        h2_blist_cleanup(&beam->buckets_to_send);
        purge_consumed_buckets(beam);
    }
}

static apr_status_t beam_cleanup(void *data)
{
    h2_bucket_beam *beam = data;
    beam_shutdown(beam, APR_SHUTDOWN_READWRITE);
    beam->pool = NULL; /* the pool is clearing now */
    return APR_SUCCESS;
}

apr_status_t h2_beam_destroy(h2_bucket_beam *beam, conn_rec *c)
{
    if (beam->pool) {
        H2_BEAM_LOG(beam, c, APLOG_TRACE2, 0, "destroy", NULL);
        apr_pool_cleanup_run(beam->pool, beam, beam_cleanup);
    }
    H2_BEAM_LOG(beam, c, APLOG_TRACE2, 0, "destroyed", NULL);
    return APR_SUCCESS;
}

apr_status_t h2_beam_create(h2_bucket_beam **pbeam, conn_rec *from,
                            apr_pool_t *pool, int id, const char *tag,
                            apr_size_t max_buf_size,
                            apr_interval_time_t timeout)
{
    h2_bucket_beam *beam;
    h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(from);
    apr_status_t rv;
    
    beam = apr_pcalloc(pool, sizeof(*beam));
    beam->pool = pool;
    beam->from = from;
    beam->id = id;
    beam->name = apr_psprintf(pool, "%s-%d-%s",
                              conn_ctx->id, id, tag);

    H2_BLIST_INIT(&beam->buckets_to_send);
    H2_BLIST_INIT(&beam->buckets_consumed);
    beam->tx_mem_limits = 1;
    beam->max_buf_size = max_buf_size;
    beam->timeout = timeout;

    rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, pool);
    if (APR_SUCCESS != rv) goto cleanup;
    rv = apr_thread_cond_create(&beam->change, pool);
    if (APR_SUCCESS != rv) goto cleanup;
    apr_pool_pre_cleanup_register(pool, beam, beam_cleanup);

cleanup:
    H2_BEAM_LOG(beam, from, APLOG_TRACE2, rv, "created", NULL);
    *pbeam = (APR_SUCCESS == rv)? beam : NULL;
    return rv;
}

void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size)
{
    apr_thread_mutex_lock(beam->lock);
    beam->max_buf_size = buffer_size;
    apr_thread_mutex_unlock(beam->lock);
}

void h2_beam_set_copy_files(h2_bucket_beam * beam, int enabled)
{
    apr_thread_mutex_lock(beam->lock);
    beam->copy_files = enabled;
    apr_thread_mutex_unlock(beam->lock);
}

apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam)
{
    apr_size_t buffer_size = 0;
    
    apr_thread_mutex_lock(beam->lock);
    buffer_size = beam->max_buf_size;
    apr_thread_mutex_unlock(beam->lock);
    return buffer_size;
}

apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam)
{
    apr_interval_time_t timeout;

    apr_thread_mutex_lock(beam->lock);
    timeout = beam->timeout;
    apr_thread_mutex_unlock(beam->lock);
    return timeout;
}

void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout)
{
    apr_thread_mutex_lock(beam->lock);
    beam->timeout = timeout;
    apr_thread_mutex_unlock(beam->lock);
}

void h2_beam_abort(h2_bucket_beam *beam, conn_rec *c)
{
    apr_thread_mutex_lock(beam->lock);
    beam->aborted = 1;
    if (c == beam->from) {
        /* sender aborts */
        if (beam->send_cb) {
            beam->send_cb(beam->send_ctx, beam);
        }
        if (beam->was_empty_cb && buffer_is_empty(beam)) {
            beam->was_empty_cb(beam->was_empty_ctx, beam);
        }
        /* no more consumption reporting to sender */
        report_consumption(beam, 1);
        beam->cons_ctx = NULL;

        beam_shutdown(beam, APR_SHUTDOWN_WRITE);
    }
    else {
        /* receiver aborts */
        beam_shutdown(beam, APR_SHUTDOWN_READ);
    }
    apr_thread_cond_broadcast(beam->change);
    apr_thread_mutex_unlock(beam->lock);
}

void h2_beam_close(h2_bucket_beam *beam, conn_rec *c)
{
    apr_thread_mutex_lock(beam->lock);
    if (!beam->closed) {
        /* should only be called from sender */
        ap_assert(c == beam->from);
        beam->closed = 1;
        if (beam->send_cb) {
            beam->send_cb(beam->send_ctx, beam);
        }
        if (beam->was_empty_cb && buffer_is_empty(beam)) {
            beam->was_empty_cb(beam->was_empty_ctx, beam);
        }
        apr_thread_cond_broadcast(beam->change);
    }
    apr_thread_mutex_unlock(beam->lock);
}

static apr_status_t append_bucket(h2_bucket_beam *beam,
                                  apr_bucket_brigade *bb,
                                  apr_read_type_e block,
                                  apr_size_t *pspace_left,
                                  apr_off_t *pwritten)
{
    apr_bucket *b;
    const char *data;
    apr_size_t len;
    apr_status_t rv = APR_SUCCESS;
    int can_beam = 0;
    
    (void)block;
    if (beam->aborted) {
        rv = APR_ECONNABORTED;
        goto cleanup;
    }

    ap_assert(beam->pool);

    b = APR_BRIGADE_FIRST(bb);
    if (APR_BUCKET_IS_METADATA(b)) {
        APR_BUCKET_REMOVE(b);
        apr_bucket_setaside(b, beam->pool);
        H2_BLIST_INSERT_TAIL(&beam->buckets_to_send, b);
        goto cleanup;
    }
    /* non meta bucket */

    /* in case of indeterminate length, we need to read the bucket,
     * so that it transforms itself into something stable. */
    if (b->length == ((apr_size_t)-1)) {
        rv = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
        if (rv != APR_SUCCESS) goto cleanup;
    }

    if (APR_BUCKET_IS_FILE(b)) {
        /* For file buckets the problem is their internal readpool that
         * is used on the first read to allocate buffer/mmap.
         * Since setting aside a file bucket will de-register the
         * file cleanup function from the previous pool, we need to
         * call that only from the sender thread.
         *
         * Currently, we do not handle file bucket with refcount > 1 as
         * the beam is then not in complete control of the file's lifetime.
         * Which results in the bug that a file get closed by the receiver
         * while the sender or the beam still have buckets using it. 
         * 
         * Additionally, we allow callbacks to prevent beaming file
         * handles across. The use case for this is to limit the number 
         * of open file handles and rather use a less efficient beam
         * transport. */
        apr_bucket_file *bf = b->data;
        can_beam = !beam->copy_files && (bf->refcount.refcount == 1);
    }
    else if (bucket_is_mmap(b)) {
        can_beam = !beam->copy_files;
    }

    if (b->length == 0) {
        apr_bucket_delete(b);
        rv = APR_SUCCESS;
        goto cleanup;
    }

    if (!*pspace_left) {
        rv = APR_EAGAIN;
        goto cleanup;
    }

    /* bucket is accepted and added to beam->buckets_to_send */
    if (APR_BUCKET_IS_HEAP(b)) {
        /* For heap buckets, a read from a receiver thread is fine. The
         * data will be there and live until the bucket itself is
         * destroyed. */
        rv = apr_bucket_setaside(b, beam->pool);
        if (rv != APR_SUCCESS) goto cleanup;
    }
    else if (can_beam && (APR_BUCKET_IS_FILE(b) || bucket_is_mmap(b))) {
        rv = apr_bucket_setaside(b, beam->pool);
        if (rv != APR_SUCCESS) goto cleanup;
    }
    else {
        /* we know of no special shortcut to transfer the bucket to
         * another pool without copying. So we make it a heap bucket. */
        apr_bucket *b2;

        rv = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
        if (rv != APR_SUCCESS) goto cleanup;
        /* this allocates and copies data */
        b2 = apr_bucket_heap_create(data, len, NULL, bb->bucket_alloc);
        apr_bucket_delete(b);
        b = b2;
        APR_BRIGADE_INSERT_HEAD(bb, b);
    }
    
    APR_BUCKET_REMOVE(b);
    H2_BLIST_INSERT_TAIL(&beam->buckets_to_send, b);
    *pwritten += (apr_off_t)b->length;
    if (b->length > *pspace_left) {
        *pspace_left = 0;
    }
    else {
        *pspace_left -= b->length;
    }

cleanup:
    return rv;
}

apr_status_t h2_beam_send(h2_bucket_beam *beam, conn_rec *from,
                          apr_bucket_brigade *sender_bb, 
                          apr_read_type_e block,
                          apr_off_t *pwritten)
{
    apr_status_t rv = APR_SUCCESS;
    apr_size_t space_left = 0;
    int was_empty;

    ap_assert(beam->pool);

    /* Called from the sender thread to add buckets to the beam */
    apr_thread_mutex_lock(beam->lock);
    ap_assert(beam->from == from);
    ap_assert(sender_bb);
    H2_BEAM_LOG(beam, from, APLOG_TRACE2, rv, "start send", sender_bb);
    purge_consumed_buckets(beam);
    *pwritten = 0;
    was_empty = buffer_is_empty(beam);

    space_left = calc_space_left(beam);
    while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) {
        rv = append_bucket(beam, sender_bb, block, &space_left, pwritten);
        if (beam->aborted) {
            goto cleanup;
        }
        else if (APR_EAGAIN == rv) {
            /* bucket was not added, as beam buffer has no space left.
             * Trigger event callbacks, so receiver can know there is something
             * to receive before we do a conditional wait. */
            purge_consumed_buckets(beam);
            if (beam->send_cb) {
                beam->send_cb(beam->send_ctx, beam);
            }
            if (was_empty && beam->was_empty_cb) {
                beam->was_empty_cb(beam->was_empty_ctx, beam);
            }
            rv = wait_not_full(beam, from, block, &space_left);
            if (APR_SUCCESS != rv) {
                break;
            }
            was_empty = buffer_is_empty(beam);
        }
    }

cleanup:
    if (beam->send_cb && !buffer_is_empty(beam)) {
        beam->send_cb(beam->send_ctx, beam);
    }
    if (was_empty && beam->was_empty_cb && !buffer_is_empty(beam)) {
        beam->was_empty_cb(beam->was_empty_ctx, beam);
    }
    apr_thread_cond_broadcast(beam->change);

    report_consumption(beam, 1);
    if (beam->aborted) {
        rv = APR_ECONNABORTED;
    }
    H2_BEAM_LOG(beam, from, APLOG_TRACE2, rv, "end send", sender_bb);
    apr_thread_mutex_unlock(beam->lock);
    return rv;
}

apr_status_t h2_beam_receive(h2_bucket_beam *beam,
                             conn_rec *to,
                             apr_bucket_brigade *bb, 
                             apr_read_type_e block,
                             apr_off_t readbytes)
{
    apr_bucket *bsender, *brecv, *ng;
    int transferred = 0;
    apr_status_t rv = APR_SUCCESS;
    apr_off_t remain;
    int consumed_buckets = 0;

    apr_thread_mutex_lock(beam->lock);
    H2_BEAM_LOG(beam, to, APLOG_TRACE2, 0, "start receive", bb);
    if (readbytes <= 0) {
        readbytes = (apr_off_t)APR_SIZE_MAX;
    }
    remain = readbytes;

transfer:
    if (beam->aborted) {
        beam_shutdown(beam, APR_SHUTDOWN_READ);
        rv = APR_ECONNABORTED;
        goto leave;
    }

    ap_assert(beam->pool);

    /* transfer from our sender brigade, transforming sender buckets to
     * receiver ones until we have enough */
    while (remain >= 0 && !H2_BLIST_EMPTY(&beam->buckets_to_send)) {

        brecv = NULL;
        bsender = H2_BLIST_FIRST(&beam->buckets_to_send);
        if (bsender->length > 0 && remain <= 0) {
            break;
        }

        if (APR_BUCKET_IS_METADATA(bsender)) {
            /* we need a real copy into the receivers bucket_alloc */
            if (APR_BUCKET_IS_EOS(bsender)) {
                /* this closes the beam */
                beam->closed = 1;
                brecv = apr_bucket_eos_create(bb->bucket_alloc);
            }
            else if (APR_BUCKET_IS_FLUSH(bsender)) {
                brecv = apr_bucket_flush_create(bb->bucket_alloc);
            }
#if AP_HAS_RESPONSE_BUCKETS
            else if (AP_BUCKET_IS_RESPONSE(bsender)) {
                brecv = ap_bucket_response_clone(bsender, bb->p, bb->bucket_alloc);
            }
            else if (AP_BUCKET_IS_REQUEST(bsender)) {
                brecv = ap_bucket_request_clone(bsender, bb->p, bb->bucket_alloc);
            }
            else if (AP_BUCKET_IS_HEADERS(bsender)) {
                brecv = ap_bucket_headers_clone(bsender, bb->p, bb->bucket_alloc);
            }
#else
            else if (H2_BUCKET_IS_HEADERS(bsender)) {
                brecv = h2_bucket_headers_clone(bsender, bb->p, bb->bucket_alloc);
            }
#endif /* AP_HAS_RESPONSE_BUCKETS */
            else if (AP_BUCKET_IS_ERROR(bsender)) {
                ap_bucket_error *eb = bsender->data;
                brecv = ap_bucket_error_create(eb->status, eb->data,
                                               bb->p, bb->bucket_alloc);
            }
        }
        else if (bsender->length == 0) {
            /* nop */
        }
#if APR_HAS_MMAP
        else if (APR_BUCKET_IS_MMAP(bsender)) {
            apr_bucket_mmap *bmmap = bsender->data;
            apr_mmap_t *mmap;
            rv = apr_mmap_dup(&mmap, bmmap->mmap, bb->p);
            if (rv != APR_SUCCESS) goto leave;
            brecv = apr_bucket_mmap_create(mmap, bsender->start, bsender->length, bb->bucket_alloc);
        }
#endif
        else if (APR_BUCKET_IS_FILE(bsender)) {
            /* This is setaside into the target brigade pool so that
             * any read operation messes with that pool and not
             * the sender one. */
            apr_bucket_file *f = (apr_bucket_file *)bsender->data;
            apr_file_t *fd = f->fd;
            int setaside = (f->readpool != bb->p);

            if (setaside) {
                rv = apr_file_setaside(&fd, fd, bb->p);
                if (rv != APR_SUCCESS) goto leave;
            }
            ng = apr_brigade_insert_file(bb, fd, bsender->start, (apr_off_t)bsender->length,
                                         bb->p);
#if APR_HAS_MMAP
            /* disable mmap handling as this leads to segfaults when
             * the underlying file is changed while memory pointer has
             * been handed out. See also PR 59348 */
            apr_bucket_file_enable_mmap(ng, 0);
#endif
            remain -= bsender->length;
            ++transferred;
        }
        else {
            const char *data;
            apr_size_t dlen;
            /* we did that when the bucket was added, so this should
             * give us the same data as before without changing the bucket
             * or anything (pool) connected to it. */
            rv = apr_bucket_read(bsender, &data, &dlen, APR_BLOCK_READ);
            if (rv != APR_SUCCESS) goto leave;
            rv = apr_brigade_write(bb, NULL, NULL, data, dlen);
            if (rv != APR_SUCCESS) goto leave;

            remain -= dlen;
            ++transferred;
        }

        if (brecv) {
            /* we have a proxy that we can give the receiver */
            APR_BRIGADE_INSERT_TAIL(bb, brecv);
            remain -= brecv->length;
            ++transferred;
        }
        APR_BUCKET_REMOVE(bsender);
        H2_BLIST_INSERT_TAIL(&beam->buckets_consumed, bsender);
        beam->recv_bytes += bsender->length;
        ++consumed_buckets;
    }

    if (beam->recv_cb && consumed_buckets > 0) {
        beam->recv_cb(beam->recv_ctx, beam);
    }

    if (transferred) {
        apr_thread_cond_broadcast(beam->change);
        rv = APR_SUCCESS;
    }
    else if (beam->aborted) {
        rv = APR_ECONNABORTED;
    }
    else if (beam->closed) {
        rv = APR_EOF;
    }
    else {
        rv = wait_not_empty(beam, to, block);
        if (rv != APR_SUCCESS) {
            goto leave;
        }
        goto transfer;
    }

leave:
    H2_BEAM_LOG(beam, to, APLOG_TRACE2, rv, "end receive", bb);
    apr_thread_mutex_unlock(beam->lock);
    return rv;
}

void h2_beam_on_consumed(h2_bucket_beam *beam, 
                         h2_beam_io_callback *io_cb, void *ctx)
{
    apr_thread_mutex_lock(beam->lock);
    beam->cons_io_cb = io_cb;
    beam->cons_ctx = ctx;
    apr_thread_mutex_unlock(beam->lock);
}

void h2_beam_on_received(h2_bucket_beam *beam,
                         h2_beam_ev_callback *recv_cb, void *ctx)
{
    apr_thread_mutex_lock(beam->lock);
    beam->recv_cb = recv_cb;
    beam->recv_ctx = ctx;
    apr_thread_mutex_unlock(beam->lock);
}

void h2_beam_on_send(h2_bucket_beam *beam,
                     h2_beam_ev_callback *send_cb, void *ctx)
{
    apr_thread_mutex_lock(beam->lock);
    beam->send_cb = send_cb;
    beam->send_ctx = ctx;
    apr_thread_mutex_unlock(beam->lock);
}

void h2_beam_on_was_empty(h2_bucket_beam *beam,
                          h2_beam_ev_callback *was_empty_cb, void *ctx)
{
    apr_thread_mutex_lock(beam->lock);
    beam->was_empty_cb = was_empty_cb;
    beam->was_empty_ctx = ctx;
    apr_thread_mutex_unlock(beam->lock);
}


static apr_off_t get_buffered_data_len(h2_bucket_beam *beam)
{
    apr_bucket *b;
    apr_off_t l = 0;

    for (b = H2_BLIST_FIRST(&beam->buckets_to_send);
        b != H2_BLIST_SENTINEL(&beam->buckets_to_send);
        b = APR_BUCKET_NEXT(b)) {
        /* should all have determinate length */
        l += b->length;
    }
    return l;
}

apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam)
{
    apr_off_t l = 0;

    apr_thread_mutex_lock(beam->lock);
    l = get_buffered_data_len(beam);
    apr_thread_mutex_unlock(beam->lock);
    return l;
}

apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam)
{
    apr_bucket *b;
    apr_off_t l = 0;

    apr_thread_mutex_lock(beam->lock);
    for (b = H2_BLIST_FIRST(&beam->buckets_to_send);
        b != H2_BLIST_SENTINEL(&beam->buckets_to_send);
        b = APR_BUCKET_NEXT(b)) {
        l += bucket_mem_used(b);
    }
    apr_thread_mutex_unlock(beam->lock);
    return l;
}

int h2_beam_empty(h2_bucket_beam *beam)
{
    int empty = 1;

    apr_thread_mutex_lock(beam->lock);
    empty = buffer_is_empty(beam);
    apr_thread_mutex_unlock(beam->lock);
    return empty;
}

int h2_beam_report_consumption(h2_bucket_beam *beam)
{
    int rv = 0;

    apr_thread_mutex_lock(beam->lock);
    rv = report_consumption(beam, 1);
    apr_thread_mutex_unlock(beam->lock);
    return rv;
}