summaryrefslogtreecommitdiffstats
path: root/modules/http2/h2_bucket_beam.h
diff options
context:
space:
mode:
Diffstat (limited to 'modules/http2/h2_bucket_beam.h')
-rw-r--r--modules/http2/h2_bucket_beam.h375
1 files changed, 118 insertions, 257 deletions
diff --git a/modules/http2/h2_bucket_beam.h b/modules/http2/h2_bucket_beam.h
index f260762..c58ce98 100644
--- a/modules/http2/h2_bucket_beam.h
+++ b/modules/http2/h2_bucket_beam.h
@@ -17,191 +17,63 @@
#ifndef h2_bucket_beam_h
#define h2_bucket_beam_h
+#include "h2_conn_ctx.h"
+
struct apr_thread_mutex_t;
struct apr_thread_cond_t;
-/*******************************************************************************
- * apr_bucket list without bells and whistles
- ******************************************************************************/
-
-/**
- * h2_blist can hold a list of buckets just like apr_bucket_brigade, but
- * does not to any allocations or related features.
- */
-typedef struct {
- APR_RING_HEAD(h2_bucket_list, apr_bucket) list;
-} h2_blist;
-
-#define H2_BLIST_INIT(b) APR_RING_INIT(&(b)->list, apr_bucket, link);
-#define H2_BLIST_SENTINEL(b) APR_RING_SENTINEL(&(b)->list, apr_bucket, link)
-#define H2_BLIST_EMPTY(b) APR_RING_EMPTY(&(b)->list, apr_bucket, link)
-#define H2_BLIST_FIRST(b) APR_RING_FIRST(&(b)->list)
-#define H2_BLIST_LAST(b) APR_RING_LAST(&(b)->list)
-#define H2_BLIST_INSERT_HEAD(b, e) do { \
- apr_bucket *ap__b = (e); \
- APR_RING_INSERT_HEAD(&(b)->list, ap__b, apr_bucket, link); \
- } while (0)
-#define H2_BLIST_INSERT_TAIL(b, e) do { \
- apr_bucket *ap__b = (e); \
- APR_RING_INSERT_TAIL(&(b)->list, ap__b, apr_bucket, link); \
- } while (0)
-#define H2_BLIST_CONCAT(a, b) do { \
- APR_RING_CONCAT(&(a)->list, &(b)->list, apr_bucket, link); \
- } while (0)
-#define H2_BLIST_PREPEND(a, b) do { \
- APR_RING_PREPEND(&(a)->list, &(b)->list, apr_bucket, link); \
- } while (0)
-
-/*******************************************************************************
- * h2_bucket_beam
- ******************************************************************************/
-
/**
* A h2_bucket_beam solves the task of transferring buckets, esp. their data,
- * across threads with zero buffer copies.
- *
- * When a thread, let's call it the sender thread, wants to send buckets to
- * another, the green thread, it creates a h2_bucket_beam and adds buckets
- * via the h2_beam_send(). It gives the beam to the green thread which then
- * can receive buckets into its own brigade via h2_beam_receive().
- *
- * Sending and receiving can happen concurrently.
- *
- * The beam can limit the amount of data it accepts via the buffer_size. This
- * can also be adjusted during its lifetime. Sends and receives can be done blocking.
- * A timeout can be set for such blocks.
- *
- * Care needs to be taken when terminating the beam. The beam registers at
- * the pool it was created with and will cleanup after itself. However, if
- * received buckets do still exist, already freed memory might be accessed.
- * The beam does a assertion on this condition.
- *
- * The proper way of shutting down a beam is to first make sure there are no
- * more green buckets out there, then cleanup the beam to purge eventually
- * still existing sender buckets and then, possibly, terminate the beam itself
- * (or the pool it was created with).
- *
- * The following restrictions apply to bucket transport:
- * - only EOS and FLUSH meta buckets are copied through. All other meta buckets
- * are kept in the beams hold.
- * - all kind of data buckets are transported through:
- * - transient buckets are converted to heap ones on send
- * - heap and pool buckets require no extra handling
- * - buckets with indeterminate length are read on send
- * - file buckets will transfer the file itself into a new bucket, if allowed
- * - all other buckets are read on send to make sure data is present
- *
- * This assures that when the sender thread sends its sender buckets, the data
- * is made accessible while still on the sender side. The sender bucket then enters
- * the beams hold storage.
- * When the green thread calls receive, sender buckets in the hold are wrapped
- * into special beam buckets. Beam buckets on read present the data directly
- * from the internal sender one, but otherwise live on the green side. When a
- * beam bucket gets destroyed, it notifies its beam that the corresponding
- * sender bucket from the hold may be destroyed.
- * Since the destruction of green buckets happens in the green thread, any
- * corresponding sender bucket can not immediately be destroyed, as that would
- * result in race conditions.
- * Instead, the beam transfers such sender buckets from the hold to the purge
- * storage. Next time there is a call from the sender side, the buckets in
- * purge will be deleted.
- *
- * There are callbacks that can be registesender with a beam:
- * - a "consumed" callback that gets called on the sender side with the
- * amount of data that has been received by the green side. The amount
- * is a delta from the last callback invocation. The sender side can trigger
- * these callbacks by calling h2_beam_send() with a NULL brigade.
- * - a "can_beam_file" callback that can prohibit the transfer of file handles
- * through the beam. This will cause file buckets to be read on send and
- * its data buffer will then be transports just like a heap bucket would.
- * When no callback is registered, no restrictions apply and all files are
- * passed through.
- * File handles transfersender to the green side will stay there until the
- * receiving brigade's pool is destroyed/cleared. If the pool lives very
- * long or if many different files are beamed, the process might run out
- * of available file handles.
- *
- * The name "beam" of course is inspired by good old transporter
- * technology where humans are kept inside the transporter's memory
- * buffers until the transmission is complete. Star gates use a similar trick.
+ * across threads with as little copying as possible.
*/
-typedef void h2_beam_mutex_leave(void *ctx, struct apr_thread_mutex_t *lock);
-
-typedef struct {
- apr_thread_mutex_t *mutex;
- h2_beam_mutex_leave *leave;
- void *leave_ctx;
-} h2_beam_lock;
-
typedef struct h2_bucket_beam h2_bucket_beam;
-typedef apr_status_t h2_beam_mutex_enter(void *ctx, h2_beam_lock *pbl);
-
typedef void h2_beam_io_callback(void *ctx, h2_bucket_beam *beam,
apr_off_t bytes);
typedef void h2_beam_ev_callback(void *ctx, h2_bucket_beam *beam);
-typedef struct h2_beam_proxy h2_beam_proxy;
-typedef struct {
- APR_RING_HEAD(h2_beam_proxy_list, h2_beam_proxy) list;
-} h2_bproxy_list;
-
-typedef int h2_beam_can_beam_callback(void *ctx, h2_bucket_beam *beam,
- apr_file_t *file);
-
-typedef enum {
- H2_BEAM_OWNER_SEND,
- H2_BEAM_OWNER_RECV
-} h2_beam_owner_t;
-
/**
- * Will deny all transfer of apr_file_t across the beam and force
- * a data copy instead.
+ * h2_blist can hold a list of buckets just like apr_bucket_brigade, but
+ * does not to any allocations or related features.
*/
-int h2_beam_no_files(void *ctx, h2_bucket_beam *beam, apr_file_t *file);
+typedef struct {
+ APR_RING_HEAD(h2_bucket_list, apr_bucket) list;
+} h2_blist;
struct h2_bucket_beam {
int id;
- const char *tag;
+ const char *name;
+ conn_rec *from;
apr_pool_t *pool;
- h2_beam_owner_t owner;
- h2_blist send_list;
- h2_blist hold_list;
- h2_blist purge_list;
- apr_bucket_brigade *recv_buffer;
- h2_bproxy_list proxies;
- apr_pool_t *send_pool;
- apr_pool_t *recv_pool;
-
+ h2_blist buckets_to_send;
+ h2_blist buckets_consumed;
+ h2_blist buckets_eor;
+
apr_size_t max_buf_size;
apr_interval_time_t timeout;
- apr_off_t sent_bytes; /* amount of bytes send */
- apr_off_t received_bytes; /* amount of bytes received */
-
- apr_size_t buckets_sent; /* # of beam buckets sent */
- apr_size_t files_beamed; /* how many file handles have been set aside */
-
- unsigned int aborted : 1;
- unsigned int closed : 1;
- unsigned int close_sent : 1;
- unsigned int tx_mem_limits : 1; /* only memory size counts on transfers */
+ int aborted;
+ int closed;
+ int tx_mem_limits; /* only memory size counts on transfers */
+ int copy_files;
struct apr_thread_mutex_t *lock;
struct apr_thread_cond_t *change;
- apr_off_t cons_bytes_reported; /* amount of bytes reported as consumed */
- h2_beam_ev_callback *cons_ev_cb;
- h2_beam_io_callback *cons_io_cb;
+ h2_beam_ev_callback *was_empty_cb; /* event: beam changed to non-empty in h2_beam_send() */
+ void *was_empty_ctx;
+ h2_beam_ev_callback *recv_cb; /* event: buckets were transfered in h2_beam_receive() */
+ void *recv_ctx;
+ h2_beam_ev_callback *send_cb; /* event: buckets were added in h2_beam_send() */
+ void *send_ctx;
+ h2_beam_ev_callback *eagain_cb; /* event: a receive results in ARP_EAGAIN */
+ void *eagain_ctx;
+
+ apr_off_t recv_bytes; /* amount of bytes transferred in h2_beam_receive() */
+ apr_off_t recv_bytes_reported; /* amount of bytes reported as received via callback */
+ h2_beam_io_callback *cons_io_cb; /* report: recv_bytes deltas for sender */
void *cons_ctx;
-
- apr_off_t prod_bytes_reported; /* amount of bytes reported as produced */
- h2_beam_io_callback *prod_io_cb;
- void *prod_ctx;
-
- h2_beam_can_beam_callback *can_beam_fn;
- void *can_beam_ctx;
};
/**
@@ -212,56 +84,66 @@ struct h2_bucket_beam {
* that is only used inside that same mutex.
*
* @param pbeam will hold the created beam on return
+ * @param c_from connection from which buchets are sent
* @param pool pool owning the beam, beam will cleanup when pool released
* @param id identifier of the beam
* @param tag tag identifying beam for logging
- * @param owner if the beam is owned by the sender or receiver, e.g. if
- * the pool owner is using this beam for sending or receiving
* @param buffer_size maximum memory footprint of buckets buffered in beam, or
* 0 for no limitation
* @param timeout timeout for blocking operations
*/
apr_status_t h2_beam_create(h2_bucket_beam **pbeam,
+ conn_rec *from,
apr_pool_t *pool,
int id, const char *tag,
- h2_beam_owner_t owner,
apr_size_t buffer_size,
apr_interval_time_t timeout);
/**
* Destroys the beam immediately without cleanup.
*/
-apr_status_t h2_beam_destroy(h2_bucket_beam *beam);
+apr_status_t h2_beam_destroy(h2_bucket_beam *beam, conn_rec *c);
/**
- * Send buckets from the given brigade through the beam. Will hold buckets
- * internally as long as they have not been processed by the receiving side.
- * All accepted buckets are removed from the given brigade. Will return with
- * APR_EAGAIN on non-blocking sends when not all buckets could be accepted.
- *
- * Call from the sender side only.
+ * Switch copying of file buckets on/off.
*/
-apr_status_t h2_beam_send(h2_bucket_beam *beam,
- apr_bucket_brigade *bb,
- apr_read_type_e block);
+void h2_beam_set_copy_files(h2_bucket_beam * beam, int enabled);
/**
- * Register the pool from which future buckets are send. This defines
- * the lifetime of the buckets, e.g. the pool should not be cleared/destroyed
- * until the data is no longer needed (or has been received).
+ * Send buckets from the given brigade through the beam.
+ * This can block of the amount of bucket data is above the buffer limit.
+ * @param beam the beam to add buckets to
+ * @param from the connection the sender operates on, must be the same as
+ * used to create the beam
+ * @param bb the brigade to take buckets from
+ * @param block if the sending should block when the buffer is full
+ * @param pwritten on return, contains the number of data bytes sent
+ * @return APR_SUCCESS when buckets were added to the beam. This can be
+ * a partial transfer and other buckets may still remain in bb
+ * APR_EAGAIN on non-blocking send when the buffer is full
+ * APR_TIMEUP on blocking semd that time out
+ * APR_ECONNABORTED when beam has been aborted
*/
-void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p);
+apr_status_t h2_beam_send(h2_bucket_beam *beam, conn_rec *from,
+ apr_bucket_brigade *bb,
+ apr_read_type_e block,
+ apr_off_t *pwritten);
/**
- * Receive buckets from the beam into the given brigade. Will return APR_EOF
- * when reading past an EOS bucket. Reads can be blocking until data is
- * available or the beam has been closed. Non-blocking calls return APR_EAGAIN
- * if no data is available.
- *
- * Call from the receiver side only.
+ * Receive buckets from the beam into the given brigade. The caller is
+ * operating on connection `to`.
+ * @param beam the beam to receive buckets from
+ * @param to the connection the receiver is working with
+ * @param bb the bucket brigade to append to
+ * @param block if the read should block when buckets are unavailable
+ * @param readbytes the amount of data the receiver wants
+ * @return APR_SUCCESS when buckets were appended
+ * APR_EAGAIN on non-blocking read when no buckets are available
+ * APR_TIMEUP on blocking reads that time out
+ * APR_ECONNABORTED when beam has been aborted
*/
-apr_status_t h2_beam_receive(h2_bucket_beam *beam,
- apr_bucket_brigade *green_buckets,
+apr_status_t h2_beam_receive(h2_bucket_beam *beam, conn_rec *to,
+ apr_bucket_brigade *bb,
apr_read_type_e block,
apr_off_t readbytes);
@@ -271,53 +153,27 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam,
int h2_beam_empty(h2_bucket_beam *beam);
/**
- * Determine if beam has handed out proxy buckets that are not destroyed.
- */
-int h2_beam_holds_proxies(h2_bucket_beam *beam);
-
-/**
- * Abort the beam. Will cleanup any buffered buckets and answer all send
- * and receives with APR_ECONNABORTED.
- *
- * Call from the sender side only.
- */
-void h2_beam_abort(h2_bucket_beam *beam);
-
-/**
- * Close the beam. Sending an EOS bucket serves the same purpose.
- *
- * Call from the sender side only.
- */
-apr_status_t h2_beam_close(h2_bucket_beam *beam);
-
-/**
- * Receives leaves the beam, e.g. will no longer read. This will
- * interrupt any sender blocked writing and fail future send.
- *
- * Call from the receiver side only.
+ * Abort the beam, either from receiving or sending side.
+ *
+ * @param beam the beam to abort
+ * @param c the connection the caller is working with
*/
-apr_status_t h2_beam_leave(h2_bucket_beam *beam);
-
-int h2_beam_is_closed(h2_bucket_beam *beam);
+void h2_beam_abort(h2_bucket_beam *beam, conn_rec *c);
/**
- * Return APR_SUCCESS when all buckets in transit have been handled.
- * When called with APR_BLOCK_READ and a mutex set, will wait until the green
- * side has consumed all data. Otherwise APR_EAGAIN is returned.
- * With clear_buffers set, any queued data is discarded.
- * If a timeout is set on the beam, waiting might also time out and
- * return APR_ETIMEUP.
+ * Close the beam. Make certain an EOS is sent.
*
- * Call from the sender side only.
+ * @param beam the beam to abort
+ * @param c the connection the caller is working with
*/
-apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block);
+void h2_beam_close(h2_bucket_beam *beam, conn_rec *c);
-/**
- * Set/get the timeout for blocking read/write operations. Only works
- * if a mutex has been set for the beam.
+/**
+ * Set/get the timeout for blocking sebd/receive operations.
*/
void h2_beam_timeout_set(h2_bucket_beam *beam,
apr_interval_time_t timeout);
+
apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam);
/**
@@ -332,7 +188,6 @@ apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam);
* amount of bytes that have been consumed by the receiver, since the
* last callback invocation or reset.
* @param beam the beam to set the callback on
- * @param ev_cb the callback or NULL, called when bytes are consumed
* @param io_cb the callback or NULL, called on sender with bytes consumed
* @param ctx the context to use in callback invocation
*
@@ -340,43 +195,58 @@ apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam);
* from any side.
*/
void h2_beam_on_consumed(h2_bucket_beam *beam,
- h2_beam_ev_callback *ev_cb,
h2_beam_io_callback *io_cb, void *ctx);
/**
- * Call any registered consumed handler, if any changes have happened
- * since the last invocation.
- * @return !=0 iff a handler has been called
- *
- * Needs to be invoked from the sending side.
+ * Register a callback to be invoked on the receiver side whenever
+ * buckets have been transfered in a h2_beam_receive() call.
+ * @param beam the beam to set the callback on
+ * @param recv_cb the callback or NULL, called when buckets are received
+ * @param ctx the context to use in callback invocation
*/
-int h2_beam_report_consumption(h2_bucket_beam *beam);
+void h2_beam_on_received(h2_bucket_beam *beam,
+ h2_beam_ev_callback *recv_cb, void *ctx);
/**
- * Register a callback to be invoked on the receiver side with the
- * amount of bytes that have been produces by the sender, since the
- * last callback invocation or reset.
+ * Register a callback to be invoked on the receiver side whenever
+ * APR_EAGAIN is being returned in h2_beam_receive().
* @param beam the beam to set the callback on
- * @param io_cb the callback or NULL, called on receiver with bytes produced
+ * @param egain_cb the callback or NULL, called before APR_EAGAIN is returned
* @param ctx the context to use in callback invocation
- *
- * Call from the receiver side, callbacks invoked on either side.
*/
-void h2_beam_on_produced(h2_bucket_beam *beam,
- h2_beam_io_callback *io_cb, void *ctx);
+void h2_beam_on_eagain(h2_bucket_beam *beam,
+ h2_beam_ev_callback *eagain_cb, void *ctx);
/**
- * Register a callback that may prevent a file from being beam as
- * file handle, forcing the file content to be copied. Then no callback
- * is set (NULL), file handles are transferred directly.
+ * Register a call back from the sender side to be invoked when send
+ * has added buckets to the beam.
+ * Unregister by passing a NULL on_send_cb.
* @param beam the beam to set the callback on
- * @param io_cb the callback or NULL, called on receiver with bytes produced
+ * @param on_send_cb the callback to invoke after buckets were added
* @param ctx the context to use in callback invocation
- *
- * Call from the receiver side, callbacks invoked on either side.
*/
-void h2_beam_on_file_beam(h2_bucket_beam *beam,
- h2_beam_can_beam_callback *cb, void *ctx);
+void h2_beam_on_send(h2_bucket_beam *beam,
+ h2_beam_ev_callback *on_send_cb, void *ctx);
+
+/**
+ * Register a call back from the sender side to be invoked when send
+ * has added to a previously empty beam.
+ * Unregister by passing a NULL was_empty_cb.
+ * @param beam the beam to set the callback on
+ * @param was_empty_cb the callback to invoke on blocked send
+ * @param ctx the context to use in callback invocation
+ */
+void h2_beam_on_was_empty(h2_bucket_beam *beam,
+ h2_beam_ev_callback *was_empty_cb, void *ctx);
+
+/**
+ * Call any registered consumed handler, if any changes have happened
+ * since the last invocation.
+ * @return !=0 iff a handler has been called
+ *
+ * Needs to be invoked from the sending side.
+ */
+int h2_beam_report_consumption(h2_bucket_beam *beam);
/**
* Get the amount of bytes currently buffered in the beam (unread).
@@ -389,18 +259,9 @@ apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam);
apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam);
/**
- * Return != 0 iff (some) data from the beam has been received.
+ * @return != 0 iff beam has been closed or has an EOS bucket buffered
+ * waiting to be received.
*/
-int h2_beam_was_received(h2_bucket_beam *beam);
-
-apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam);
-
-typedef apr_bucket *h2_bucket_beamer(h2_bucket_beam *beam,
- apr_bucket_brigade *dest,
- const apr_bucket *src);
-
-void h2_register_bucket_beamer(h2_bucket_beamer *beamer);
-
-void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg);
+int h2_beam_is_complete(h2_bucket_beam *beam);
#endif /* h2_bucket_beam_h */