diff options
Diffstat (limited to 'modules/http2/h2_bucket_beam.h')
-rw-r--r-- | modules/http2/h2_bucket_beam.h | 375 |
1 files changed, 118 insertions, 257 deletions
diff --git a/modules/http2/h2_bucket_beam.h b/modules/http2/h2_bucket_beam.h index f260762..c58ce98 100644 --- a/modules/http2/h2_bucket_beam.h +++ b/modules/http2/h2_bucket_beam.h @@ -17,191 +17,63 @@ #ifndef h2_bucket_beam_h #define h2_bucket_beam_h +#include "h2_conn_ctx.h" + struct apr_thread_mutex_t; struct apr_thread_cond_t; -/******************************************************************************* - * apr_bucket list without bells and whistles - ******************************************************************************/ - -/** - * h2_blist can hold a list of buckets just like apr_bucket_brigade, but - * does not to any allocations or related features. - */ -typedef struct { - APR_RING_HEAD(h2_bucket_list, apr_bucket) list; -} h2_blist; - -#define H2_BLIST_INIT(b) APR_RING_INIT(&(b)->list, apr_bucket, link); -#define H2_BLIST_SENTINEL(b) APR_RING_SENTINEL(&(b)->list, apr_bucket, link) -#define H2_BLIST_EMPTY(b) APR_RING_EMPTY(&(b)->list, apr_bucket, link) -#define H2_BLIST_FIRST(b) APR_RING_FIRST(&(b)->list) -#define H2_BLIST_LAST(b) APR_RING_LAST(&(b)->list) -#define H2_BLIST_INSERT_HEAD(b, e) do { \ - apr_bucket *ap__b = (e); \ - APR_RING_INSERT_HEAD(&(b)->list, ap__b, apr_bucket, link); \ - } while (0) -#define H2_BLIST_INSERT_TAIL(b, e) do { \ - apr_bucket *ap__b = (e); \ - APR_RING_INSERT_TAIL(&(b)->list, ap__b, apr_bucket, link); \ - } while (0) -#define H2_BLIST_CONCAT(a, b) do { \ - APR_RING_CONCAT(&(a)->list, &(b)->list, apr_bucket, link); \ - } while (0) -#define H2_BLIST_PREPEND(a, b) do { \ - APR_RING_PREPEND(&(a)->list, &(b)->list, apr_bucket, link); \ - } while (0) - -/******************************************************************************* - * h2_bucket_beam - ******************************************************************************/ - /** * A h2_bucket_beam solves the task of transferring buckets, esp. their data, - * across threads with zero buffer copies. - * - * When a thread, let's call it the sender thread, wants to send buckets to - * another, the green thread, it creates a h2_bucket_beam and adds buckets - * via the h2_beam_send(). It gives the beam to the green thread which then - * can receive buckets into its own brigade via h2_beam_receive(). - * - * Sending and receiving can happen concurrently. - * - * The beam can limit the amount of data it accepts via the buffer_size. This - * can also be adjusted during its lifetime. Sends and receives can be done blocking. - * A timeout can be set for such blocks. - * - * Care needs to be taken when terminating the beam. The beam registers at - * the pool it was created with and will cleanup after itself. However, if - * received buckets do still exist, already freed memory might be accessed. - * The beam does a assertion on this condition. - * - * The proper way of shutting down a beam is to first make sure there are no - * more green buckets out there, then cleanup the beam to purge eventually - * still existing sender buckets and then, possibly, terminate the beam itself - * (or the pool it was created with). - * - * The following restrictions apply to bucket transport: - * - only EOS and FLUSH meta buckets are copied through. All other meta buckets - * are kept in the beams hold. - * - all kind of data buckets are transported through: - * - transient buckets are converted to heap ones on send - * - heap and pool buckets require no extra handling - * - buckets with indeterminate length are read on send - * - file buckets will transfer the file itself into a new bucket, if allowed - * - all other buckets are read on send to make sure data is present - * - * This assures that when the sender thread sends its sender buckets, the data - * is made accessible while still on the sender side. The sender bucket then enters - * the beams hold storage. - * When the green thread calls receive, sender buckets in the hold are wrapped - * into special beam buckets. Beam buckets on read present the data directly - * from the internal sender one, but otherwise live on the green side. When a - * beam bucket gets destroyed, it notifies its beam that the corresponding - * sender bucket from the hold may be destroyed. - * Since the destruction of green buckets happens in the green thread, any - * corresponding sender bucket can not immediately be destroyed, as that would - * result in race conditions. - * Instead, the beam transfers such sender buckets from the hold to the purge - * storage. Next time there is a call from the sender side, the buckets in - * purge will be deleted. - * - * There are callbacks that can be registesender with a beam: - * - a "consumed" callback that gets called on the sender side with the - * amount of data that has been received by the green side. The amount - * is a delta from the last callback invocation. The sender side can trigger - * these callbacks by calling h2_beam_send() with a NULL brigade. - * - a "can_beam_file" callback that can prohibit the transfer of file handles - * through the beam. This will cause file buckets to be read on send and - * its data buffer will then be transports just like a heap bucket would. - * When no callback is registered, no restrictions apply and all files are - * passed through. - * File handles transfersender to the green side will stay there until the - * receiving brigade's pool is destroyed/cleared. If the pool lives very - * long or if many different files are beamed, the process might run out - * of available file handles. - * - * The name "beam" of course is inspired by good old transporter - * technology where humans are kept inside the transporter's memory - * buffers until the transmission is complete. Star gates use a similar trick. + * across threads with as little copying as possible. */ -typedef void h2_beam_mutex_leave(void *ctx, struct apr_thread_mutex_t *lock); - -typedef struct { - apr_thread_mutex_t *mutex; - h2_beam_mutex_leave *leave; - void *leave_ctx; -} h2_beam_lock; - typedef struct h2_bucket_beam h2_bucket_beam; -typedef apr_status_t h2_beam_mutex_enter(void *ctx, h2_beam_lock *pbl); - typedef void h2_beam_io_callback(void *ctx, h2_bucket_beam *beam, apr_off_t bytes); typedef void h2_beam_ev_callback(void *ctx, h2_bucket_beam *beam); -typedef struct h2_beam_proxy h2_beam_proxy; -typedef struct { - APR_RING_HEAD(h2_beam_proxy_list, h2_beam_proxy) list; -} h2_bproxy_list; - -typedef int h2_beam_can_beam_callback(void *ctx, h2_bucket_beam *beam, - apr_file_t *file); - -typedef enum { - H2_BEAM_OWNER_SEND, - H2_BEAM_OWNER_RECV -} h2_beam_owner_t; - /** - * Will deny all transfer of apr_file_t across the beam and force - * a data copy instead. + * h2_blist can hold a list of buckets just like apr_bucket_brigade, but + * does not to any allocations or related features. */ -int h2_beam_no_files(void *ctx, h2_bucket_beam *beam, apr_file_t *file); +typedef struct { + APR_RING_HEAD(h2_bucket_list, apr_bucket) list; +} h2_blist; struct h2_bucket_beam { int id; - const char *tag; + const char *name; + conn_rec *from; apr_pool_t *pool; - h2_beam_owner_t owner; - h2_blist send_list; - h2_blist hold_list; - h2_blist purge_list; - apr_bucket_brigade *recv_buffer; - h2_bproxy_list proxies; - apr_pool_t *send_pool; - apr_pool_t *recv_pool; - + h2_blist buckets_to_send; + h2_blist buckets_consumed; + h2_blist buckets_eor; + apr_size_t max_buf_size; apr_interval_time_t timeout; - apr_off_t sent_bytes; /* amount of bytes send */ - apr_off_t received_bytes; /* amount of bytes received */ - - apr_size_t buckets_sent; /* # of beam buckets sent */ - apr_size_t files_beamed; /* how many file handles have been set aside */ - - unsigned int aborted : 1; - unsigned int closed : 1; - unsigned int close_sent : 1; - unsigned int tx_mem_limits : 1; /* only memory size counts on transfers */ + int aborted; + int closed; + int tx_mem_limits; /* only memory size counts on transfers */ + int copy_files; struct apr_thread_mutex_t *lock; struct apr_thread_cond_t *change; - apr_off_t cons_bytes_reported; /* amount of bytes reported as consumed */ - h2_beam_ev_callback *cons_ev_cb; - h2_beam_io_callback *cons_io_cb; + h2_beam_ev_callback *was_empty_cb; /* event: beam changed to non-empty in h2_beam_send() */ + void *was_empty_ctx; + h2_beam_ev_callback *recv_cb; /* event: buckets were transfered in h2_beam_receive() */ + void *recv_ctx; + h2_beam_ev_callback *send_cb; /* event: buckets were added in h2_beam_send() */ + void *send_ctx; + h2_beam_ev_callback *eagain_cb; /* event: a receive results in ARP_EAGAIN */ + void *eagain_ctx; + + apr_off_t recv_bytes; /* amount of bytes transferred in h2_beam_receive() */ + apr_off_t recv_bytes_reported; /* amount of bytes reported as received via callback */ + h2_beam_io_callback *cons_io_cb; /* report: recv_bytes deltas for sender */ void *cons_ctx; - - apr_off_t prod_bytes_reported; /* amount of bytes reported as produced */ - h2_beam_io_callback *prod_io_cb; - void *prod_ctx; - - h2_beam_can_beam_callback *can_beam_fn; - void *can_beam_ctx; }; /** @@ -212,56 +84,66 @@ struct h2_bucket_beam { * that is only used inside that same mutex. * * @param pbeam will hold the created beam on return + * @param c_from connection from which buchets are sent * @param pool pool owning the beam, beam will cleanup when pool released * @param id identifier of the beam * @param tag tag identifying beam for logging - * @param owner if the beam is owned by the sender or receiver, e.g. if - * the pool owner is using this beam for sending or receiving * @param buffer_size maximum memory footprint of buckets buffered in beam, or * 0 for no limitation * @param timeout timeout for blocking operations */ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, + conn_rec *from, apr_pool_t *pool, int id, const char *tag, - h2_beam_owner_t owner, apr_size_t buffer_size, apr_interval_time_t timeout); /** * Destroys the beam immediately without cleanup. */ -apr_status_t h2_beam_destroy(h2_bucket_beam *beam); +apr_status_t h2_beam_destroy(h2_bucket_beam *beam, conn_rec *c); /** - * Send buckets from the given brigade through the beam. Will hold buckets - * internally as long as they have not been processed by the receiving side. - * All accepted buckets are removed from the given brigade. Will return with - * APR_EAGAIN on non-blocking sends when not all buckets could be accepted. - * - * Call from the sender side only. + * Switch copying of file buckets on/off. */ -apr_status_t h2_beam_send(h2_bucket_beam *beam, - apr_bucket_brigade *bb, - apr_read_type_e block); +void h2_beam_set_copy_files(h2_bucket_beam * beam, int enabled); /** - * Register the pool from which future buckets are send. This defines - * the lifetime of the buckets, e.g. the pool should not be cleared/destroyed - * until the data is no longer needed (or has been received). + * Send buckets from the given brigade through the beam. + * This can block of the amount of bucket data is above the buffer limit. + * @param beam the beam to add buckets to + * @param from the connection the sender operates on, must be the same as + * used to create the beam + * @param bb the brigade to take buckets from + * @param block if the sending should block when the buffer is full + * @param pwritten on return, contains the number of data bytes sent + * @return APR_SUCCESS when buckets were added to the beam. This can be + * a partial transfer and other buckets may still remain in bb + * APR_EAGAIN on non-blocking send when the buffer is full + * APR_TIMEUP on blocking semd that time out + * APR_ECONNABORTED when beam has been aborted */ -void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p); +apr_status_t h2_beam_send(h2_bucket_beam *beam, conn_rec *from, + apr_bucket_brigade *bb, + apr_read_type_e block, + apr_off_t *pwritten); /** - * Receive buckets from the beam into the given brigade. Will return APR_EOF - * when reading past an EOS bucket. Reads can be blocking until data is - * available or the beam has been closed. Non-blocking calls return APR_EAGAIN - * if no data is available. - * - * Call from the receiver side only. + * Receive buckets from the beam into the given brigade. The caller is + * operating on connection `to`. + * @param beam the beam to receive buckets from + * @param to the connection the receiver is working with + * @param bb the bucket brigade to append to + * @param block if the read should block when buckets are unavailable + * @param readbytes the amount of data the receiver wants + * @return APR_SUCCESS when buckets were appended + * APR_EAGAIN on non-blocking read when no buckets are available + * APR_TIMEUP on blocking reads that time out + * APR_ECONNABORTED when beam has been aborted */ -apr_status_t h2_beam_receive(h2_bucket_beam *beam, - apr_bucket_brigade *green_buckets, +apr_status_t h2_beam_receive(h2_bucket_beam *beam, conn_rec *to, + apr_bucket_brigade *bb, apr_read_type_e block, apr_off_t readbytes); @@ -271,53 +153,27 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam, int h2_beam_empty(h2_bucket_beam *beam); /** - * Determine if beam has handed out proxy buckets that are not destroyed. - */ -int h2_beam_holds_proxies(h2_bucket_beam *beam); - -/** - * Abort the beam. Will cleanup any buffered buckets and answer all send - * and receives with APR_ECONNABORTED. - * - * Call from the sender side only. - */ -void h2_beam_abort(h2_bucket_beam *beam); - -/** - * Close the beam. Sending an EOS bucket serves the same purpose. - * - * Call from the sender side only. - */ -apr_status_t h2_beam_close(h2_bucket_beam *beam); - -/** - * Receives leaves the beam, e.g. will no longer read. This will - * interrupt any sender blocked writing and fail future send. - * - * Call from the receiver side only. + * Abort the beam, either from receiving or sending side. + * + * @param beam the beam to abort + * @param c the connection the caller is working with */ -apr_status_t h2_beam_leave(h2_bucket_beam *beam); - -int h2_beam_is_closed(h2_bucket_beam *beam); +void h2_beam_abort(h2_bucket_beam *beam, conn_rec *c); /** - * Return APR_SUCCESS when all buckets in transit have been handled. - * When called with APR_BLOCK_READ and a mutex set, will wait until the green - * side has consumed all data. Otherwise APR_EAGAIN is returned. - * With clear_buffers set, any queued data is discarded. - * If a timeout is set on the beam, waiting might also time out and - * return APR_ETIMEUP. + * Close the beam. Make certain an EOS is sent. * - * Call from the sender side only. + * @param beam the beam to abort + * @param c the connection the caller is working with */ -apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block); +void h2_beam_close(h2_bucket_beam *beam, conn_rec *c); -/** - * Set/get the timeout for blocking read/write operations. Only works - * if a mutex has been set for the beam. +/** + * Set/get the timeout for blocking sebd/receive operations. */ void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout); + apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam); /** @@ -332,7 +188,6 @@ apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam); * amount of bytes that have been consumed by the receiver, since the * last callback invocation or reset. * @param beam the beam to set the callback on - * @param ev_cb the callback or NULL, called when bytes are consumed * @param io_cb the callback or NULL, called on sender with bytes consumed * @param ctx the context to use in callback invocation * @@ -340,43 +195,58 @@ apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam); * from any side. */ void h2_beam_on_consumed(h2_bucket_beam *beam, - h2_beam_ev_callback *ev_cb, h2_beam_io_callback *io_cb, void *ctx); /** - * Call any registered consumed handler, if any changes have happened - * since the last invocation. - * @return !=0 iff a handler has been called - * - * Needs to be invoked from the sending side. + * Register a callback to be invoked on the receiver side whenever + * buckets have been transfered in a h2_beam_receive() call. + * @param beam the beam to set the callback on + * @param recv_cb the callback or NULL, called when buckets are received + * @param ctx the context to use in callback invocation */ -int h2_beam_report_consumption(h2_bucket_beam *beam); +void h2_beam_on_received(h2_bucket_beam *beam, + h2_beam_ev_callback *recv_cb, void *ctx); /** - * Register a callback to be invoked on the receiver side with the - * amount of bytes that have been produces by the sender, since the - * last callback invocation or reset. + * Register a callback to be invoked on the receiver side whenever + * APR_EAGAIN is being returned in h2_beam_receive(). * @param beam the beam to set the callback on - * @param io_cb the callback or NULL, called on receiver with bytes produced + * @param egain_cb the callback or NULL, called before APR_EAGAIN is returned * @param ctx the context to use in callback invocation - * - * Call from the receiver side, callbacks invoked on either side. */ -void h2_beam_on_produced(h2_bucket_beam *beam, - h2_beam_io_callback *io_cb, void *ctx); +void h2_beam_on_eagain(h2_bucket_beam *beam, + h2_beam_ev_callback *eagain_cb, void *ctx); /** - * Register a callback that may prevent a file from being beam as - * file handle, forcing the file content to be copied. Then no callback - * is set (NULL), file handles are transferred directly. + * Register a call back from the sender side to be invoked when send + * has added buckets to the beam. + * Unregister by passing a NULL on_send_cb. * @param beam the beam to set the callback on - * @param io_cb the callback or NULL, called on receiver with bytes produced + * @param on_send_cb the callback to invoke after buckets were added * @param ctx the context to use in callback invocation - * - * Call from the receiver side, callbacks invoked on either side. */ -void h2_beam_on_file_beam(h2_bucket_beam *beam, - h2_beam_can_beam_callback *cb, void *ctx); +void h2_beam_on_send(h2_bucket_beam *beam, + h2_beam_ev_callback *on_send_cb, void *ctx); + +/** + * Register a call back from the sender side to be invoked when send + * has added to a previously empty beam. + * Unregister by passing a NULL was_empty_cb. + * @param beam the beam to set the callback on + * @param was_empty_cb the callback to invoke on blocked send + * @param ctx the context to use in callback invocation + */ +void h2_beam_on_was_empty(h2_bucket_beam *beam, + h2_beam_ev_callback *was_empty_cb, void *ctx); + +/** + * Call any registered consumed handler, if any changes have happened + * since the last invocation. + * @return !=0 iff a handler has been called + * + * Needs to be invoked from the sending side. + */ +int h2_beam_report_consumption(h2_bucket_beam *beam); /** * Get the amount of bytes currently buffered in the beam (unread). @@ -389,18 +259,9 @@ apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam); apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam); /** - * Return != 0 iff (some) data from the beam has been received. + * @return != 0 iff beam has been closed or has an EOS bucket buffered + * waiting to be received. */ -int h2_beam_was_received(h2_bucket_beam *beam); - -apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam); - -typedef apr_bucket *h2_bucket_beamer(h2_bucket_beam *beam, - apr_bucket_brigade *dest, - const apr_bucket *src); - -void h2_register_bucket_beamer(h2_bucket_beamer *beamer); - -void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg); +int h2_beam_is_complete(h2_bucket_beam *beam); #endif /* h2_bucket_beam_h */ |