summaryrefslogtreecommitdiffstats
path: root/src/dynbuf.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/dynbuf.c')
-rw-r--r--src/dynbuf.c131
1 files changed, 122 insertions, 9 deletions
diff --git a/src/dynbuf.c b/src/dynbuf.c
index 712e334..aec9667 100644
--- a/src/dynbuf.c
+++ b/src/dynbuf.c
@@ -15,10 +15,12 @@
#include <string.h>
#include <haproxy/api.h>
+#include <haproxy/cfgparse.h>
#include <haproxy/dynbuf.h>
#include <haproxy/global.h>
#include <haproxy/list.h>
#include <haproxy/pool.h>
+#include <haproxy/tools.h>
struct pool_head *pool_head_buffer __read_mostly;
@@ -28,13 +30,24 @@ int init_buffer()
void *buffer;
int thr;
int done;
+ int i;
pool_head_buffer = create_pool("buffer", global.tune.bufsize, MEM_F_SHARED|MEM_F_EXACT);
if (!pool_head_buffer)
return 0;
- for (thr = 0; thr < MAX_THREADS; thr++)
- LIST_INIT(&ha_thread_ctx[thr].buffer_wq);
+ /* make sure any change to the queues assignment isn't overlooked */
+ BUG_ON(DB_PERMANENT - DB_UNLIKELY - 1 != DYNBUF_NBQ);
+ BUG_ON(DB_MUX_RX_Q < DB_SE_RX_Q || DB_MUX_RX_Q >= DYNBUF_NBQ);
+ BUG_ON(DB_SE_RX_Q < DB_CHANNEL_Q || DB_SE_RX_Q >= DYNBUF_NBQ);
+ BUG_ON(DB_CHANNEL_Q < DB_MUX_TX_Q || DB_CHANNEL_Q >= DYNBUF_NBQ);
+ BUG_ON(DB_MUX_TX_Q >= DYNBUF_NBQ);
+
+ for (thr = 0; thr < MAX_THREADS; thr++) {
+ for (i = 0; i < DYNBUF_NBQ; i++)
+ LIST_INIT(&ha_thread_ctx[thr].buffer_wq[i]);
+ ha_thread_ctx[thr].bufq_map = 0;
+ }
/* The reserved buffer is what we leave behind us. Thus we always need
@@ -102,6 +115,7 @@ void buffer_dump(FILE *o, struct buffer *b, int from, int to)
void __offer_buffers(void *from, unsigned int count)
{
struct buffer_wait *wait, *wait_back;
+ int q;
/* For now, we consider that all objects need 1 buffer, so we can stop
* waking up them once we have enough of them to eat all the available
@@ -109,18 +123,117 @@ void __offer_buffers(void *from, unsigned int count)
* other tasks, but that's a rough estimate. Similarly, for each cached
* event we'll need 1 buffer.
*/
- list_for_each_entry_safe(wait, wait_back, &th_ctx->buffer_wq, list) {
- if (!count)
- break;
-
- if (wait->target == from || !wait->wakeup_cb(wait->target))
+ for (q = 0; q < DYNBUF_NBQ; q++) {
+ if (!(th_ctx->bufq_map & (1 << q)))
continue;
+ BUG_ON_HOT(LIST_ISEMPTY(&th_ctx->buffer_wq[q]));
+
+ list_for_each_entry_safe(wait, wait_back, &th_ctx->buffer_wq[q], list) {
+ if (!count)
+ break;
+
+ if (wait->target == from || !wait->wakeup_cb(wait->target))
+ continue;
+
+ LIST_DEL_INIT(&wait->list);
+ count--;
+ }
+ if (LIST_ISEMPTY(&th_ctx->buffer_wq[q]))
+ th_ctx->bufq_map &= ~(1 << q);
+ }
+}
+
+/* config parser for global "tune.buffers.limit", accepts a number >= 0 */
+static int cfg_parse_tune_buffers_limit(char **args, int section_type, struct proxy *curpx,
+ const struct proxy *defpx, const char *file, int line,
+ char **err)
+{
+ int limit;
- LIST_DEL_INIT(&wait->list);
- count--;
+ if (too_many_args(1, args, err, NULL))
+ return -1;
+
+ limit = atoi(args[1]);
+ if (limit < 0) {
+ memprintf(err, "'%s' expects a non-negative number but got '%s'.", args[0], args[1]);
+ return -1;
+ }
+
+ global.tune.buf_limit = limit;
+ if (global.tune.buf_limit) {
+ if (global.tune.buf_limit < 3)
+ global.tune.buf_limit = 3;
}
+
+ return 0;
}
+/* config parser for global "tune.buffers.reserve", accepts a number >= 0 */
+static int cfg_parse_tune_buffers_reserve(char **args, int section_type, struct proxy *curpx,
+ const struct proxy *defpx, const char *file, int line,
+ char **err)
+{
+ int reserve;
+
+ if (too_many_args(1, args, err, NULL))
+ return -1;
+
+ reserve = atoi(args[1]);
+ if (reserve < 0) {
+ memprintf(err, "'%s' expects a non-negative number but got '%s'.", args[0], args[1]);
+ return -1;
+ }
+
+ global.tune.reserved_bufs = reserve;
+ return 0;
+}
+
+/* allocate emergency buffers for the thread */
+static int alloc_emergency_buffers_per_thread(void)
+{
+ int idx;
+
+ th_ctx->emergency_bufs_left = global.tune.reserved_bufs;
+ th_ctx->emergency_bufs = calloc(global.tune.reserved_bufs, sizeof(*th_ctx->emergency_bufs));
+ if (!th_ctx->emergency_bufs)
+ return 0;
+
+ for (idx = 0; idx < global.tune.reserved_bufs; idx++) {
+ /* reserved bufs are not subject to the limit, so we must push it */
+ if (_HA_ATOMIC_LOAD(&pool_head_buffer->limit))
+ _HA_ATOMIC_INC(&pool_head_buffer->limit);
+ th_ctx->emergency_bufs[idx] = pool_alloc_flag(pool_head_buffer, POOL_F_NO_POISON | POOL_F_NO_FAIL);
+ if (!th_ctx->emergency_bufs[idx])
+ return 0;
+ }
+
+ return 1;
+}
+
+/* frees the thread's emergency buffers */
+static void free_emergency_buffers_per_thread(void)
+{
+ int idx;
+
+ if (th_ctx->emergency_bufs) {
+ for (idx = 0; idx < global.tune.reserved_bufs; idx++)
+ pool_free(pool_head_buffer, th_ctx->emergency_bufs[idx]);
+ }
+
+ ha_free(&th_ctx->emergency_bufs);
+}
+
+/* config keyword parsers */
+static struct cfg_kw_list cfg_kws = {ILH, {
+ { CFG_GLOBAL, "tune.buffers.limit", cfg_parse_tune_buffers_limit },
+ { CFG_GLOBAL, "tune.buffers.reserve", cfg_parse_tune_buffers_reserve },
+ { 0, NULL, NULL }
+}};
+
+INITCALL1(STG_REGISTER, cfg_register_keywords, &cfg_kws);
+REGISTER_PER_THREAD_ALLOC(alloc_emergency_buffers_per_thread);
+REGISTER_PER_THREAD_FREE(free_emergency_buffers_per_thread);
+
/*
* Local variables:
* c-indent-level: 8