diff options
Diffstat (limited to 'src/dynbuf.c')
-rw-r--r-- | src/dynbuf.c | 131 |
1 files changed, 122 insertions, 9 deletions
diff --git a/src/dynbuf.c b/src/dynbuf.c index 712e334..aec9667 100644 --- a/src/dynbuf.c +++ b/src/dynbuf.c @@ -15,10 +15,12 @@ #include <string.h> #include <haproxy/api.h> +#include <haproxy/cfgparse.h> #include <haproxy/dynbuf.h> #include <haproxy/global.h> #include <haproxy/list.h> #include <haproxy/pool.h> +#include <haproxy/tools.h> struct pool_head *pool_head_buffer __read_mostly; @@ -28,13 +30,24 @@ int init_buffer() void *buffer; int thr; int done; + int i; pool_head_buffer = create_pool("buffer", global.tune.bufsize, MEM_F_SHARED|MEM_F_EXACT); if (!pool_head_buffer) return 0; - for (thr = 0; thr < MAX_THREADS; thr++) - LIST_INIT(&ha_thread_ctx[thr].buffer_wq); + /* make sure any change to the queues assignment isn't overlooked */ + BUG_ON(DB_PERMANENT - DB_UNLIKELY - 1 != DYNBUF_NBQ); + BUG_ON(DB_MUX_RX_Q < DB_SE_RX_Q || DB_MUX_RX_Q >= DYNBUF_NBQ); + BUG_ON(DB_SE_RX_Q < DB_CHANNEL_Q || DB_SE_RX_Q >= DYNBUF_NBQ); + BUG_ON(DB_CHANNEL_Q < DB_MUX_TX_Q || DB_CHANNEL_Q >= DYNBUF_NBQ); + BUG_ON(DB_MUX_TX_Q >= DYNBUF_NBQ); + + for (thr = 0; thr < MAX_THREADS; thr++) { + for (i = 0; i < DYNBUF_NBQ; i++) + LIST_INIT(&ha_thread_ctx[thr].buffer_wq[i]); + ha_thread_ctx[thr].bufq_map = 0; + } /* The reserved buffer is what we leave behind us. Thus we always need @@ -102,6 +115,7 @@ void buffer_dump(FILE *o, struct buffer *b, int from, int to) void __offer_buffers(void *from, unsigned int count) { struct buffer_wait *wait, *wait_back; + int q; /* For now, we consider that all objects need 1 buffer, so we can stop * waking up them once we have enough of them to eat all the available @@ -109,18 +123,117 @@ void __offer_buffers(void *from, unsigned int count) * other tasks, but that's a rough estimate. Similarly, for each cached * event we'll need 1 buffer. */ - list_for_each_entry_safe(wait, wait_back, &th_ctx->buffer_wq, list) { - if (!count) - break; - - if (wait->target == from || !wait->wakeup_cb(wait->target)) + for (q = 0; q < DYNBUF_NBQ; q++) { + if (!(th_ctx->bufq_map & (1 << q))) continue; + BUG_ON_HOT(LIST_ISEMPTY(&th_ctx->buffer_wq[q])); + + list_for_each_entry_safe(wait, wait_back, &th_ctx->buffer_wq[q], list) { + if (!count) + break; + + if (wait->target == from || !wait->wakeup_cb(wait->target)) + continue; + + LIST_DEL_INIT(&wait->list); + count--; + } + if (LIST_ISEMPTY(&th_ctx->buffer_wq[q])) + th_ctx->bufq_map &= ~(1 << q); + } +} + +/* config parser for global "tune.buffers.limit", accepts a number >= 0 */ +static int cfg_parse_tune_buffers_limit(char **args, int section_type, struct proxy *curpx, + const struct proxy *defpx, const char *file, int line, + char **err) +{ + int limit; - LIST_DEL_INIT(&wait->list); - count--; + if (too_many_args(1, args, err, NULL)) + return -1; + + limit = atoi(args[1]); + if (limit < 0) { + memprintf(err, "'%s' expects a non-negative number but got '%s'.", args[0], args[1]); + return -1; + } + + global.tune.buf_limit = limit; + if (global.tune.buf_limit) { + if (global.tune.buf_limit < 3) + global.tune.buf_limit = 3; } + + return 0; } +/* config parser for global "tune.buffers.reserve", accepts a number >= 0 */ +static int cfg_parse_tune_buffers_reserve(char **args, int section_type, struct proxy *curpx, + const struct proxy *defpx, const char *file, int line, + char **err) +{ + int reserve; + + if (too_many_args(1, args, err, NULL)) + return -1; + + reserve = atoi(args[1]); + if (reserve < 0) { + memprintf(err, "'%s' expects a non-negative number but got '%s'.", args[0], args[1]); + return -1; + } + + global.tune.reserved_bufs = reserve; + return 0; +} + +/* allocate emergency buffers for the thread */ +static int alloc_emergency_buffers_per_thread(void) +{ + int idx; + + th_ctx->emergency_bufs_left = global.tune.reserved_bufs; + th_ctx->emergency_bufs = calloc(global.tune.reserved_bufs, sizeof(*th_ctx->emergency_bufs)); + if (!th_ctx->emergency_bufs) + return 0; + + for (idx = 0; idx < global.tune.reserved_bufs; idx++) { + /* reserved bufs are not subject to the limit, so we must push it */ + if (_HA_ATOMIC_LOAD(&pool_head_buffer->limit)) + _HA_ATOMIC_INC(&pool_head_buffer->limit); + th_ctx->emergency_bufs[idx] = pool_alloc_flag(pool_head_buffer, POOL_F_NO_POISON | POOL_F_NO_FAIL); + if (!th_ctx->emergency_bufs[idx]) + return 0; + } + + return 1; +} + +/* frees the thread's emergency buffers */ +static void free_emergency_buffers_per_thread(void) +{ + int idx; + + if (th_ctx->emergency_bufs) { + for (idx = 0; idx < global.tune.reserved_bufs; idx++) + pool_free(pool_head_buffer, th_ctx->emergency_bufs[idx]); + } + + ha_free(&th_ctx->emergency_bufs); +} + +/* config keyword parsers */ +static struct cfg_kw_list cfg_kws = {ILH, { + { CFG_GLOBAL, "tune.buffers.limit", cfg_parse_tune_buffers_limit }, + { CFG_GLOBAL, "tune.buffers.reserve", cfg_parse_tune_buffers_reserve }, + { 0, NULL, NULL } +}}; + +INITCALL1(STG_REGISTER, cfg_register_keywords, &cfg_kws); +REGISTER_PER_THREAD_ALLOC(alloc_emergency_buffers_per_thread); +REGISTER_PER_THREAD_FREE(free_emergency_buffers_per_thread); + /* * Local variables: * c-indent-level: 8 |