diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-03 05:11:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-03 05:11:10 +0000 |
commit | cff6d757e3ba609c08ef2aaa00f07e53551e5bf6 (patch) | |
tree | 08c4fc3255483ad397d712edb4214ded49149fd9 /src/applet.c | |
parent | Adding upstream version 2.9.7. (diff) | |
download | haproxy-upstream/3.0.0.tar.xz haproxy-upstream/3.0.0.zip |
Adding upstream version 3.0.0.upstream/3.0.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/applet.c')
-rw-r--r-- | src/applet.c | 565 |
1 files changed, 516 insertions, 49 deletions
diff --git a/src/applet.c b/src/applet.c index b695a9f..c528963 100644 --- a/src/applet.c +++ b/src/applet.c @@ -15,13 +15,17 @@ #include <haproxy/api.h> #include <haproxy/applet.h> +#include <haproxy/cfgparse.h> #include <haproxy/channel.h> +#include <haproxy/htx.h> #include <haproxy/list.h> #include <haproxy/sc_strm.h> #include <haproxy/stconn.h> #include <haproxy/stream.h> #include <haproxy/task.h> #include <haproxy/trace.h> +#include <haproxy/vecpair.h> +#include <haproxy/xref.h> unsigned int nb_applets = 0; @@ -50,6 +54,14 @@ static const struct trace_event applet_trace_events[] = { { .mask = APPLET_EV_ERR, .name = "app_err", .desc = "error on appctx" }, #define APPLET_EV_START (1ULL << 5) { .mask = APPLET_EV_START, .name = "app_start", .desc = "start appctx" }, +#define APPLET_EV_RECV (1ULL << 6) + { .mask = APPLET_EV_START, .name = "app_receive", .desc = "RX on appctx" }, +#define APPLET_EV_SEND (1ULL << 7) + { .mask = APPLET_EV_START, .name = "app_send", .desc = "TX on appctx" }, +#define APPLET_EV_BLK (1ULL << 8) + { .mask = APPLET_EV_START, .name = "app_blk", .desc = "appctx blocked" }, +#define APPLET_EV_WAKE (1ULL << 9) + { .mask = APPLET_EV_START, .name = "app_wake", .desc = "appctx woken up" }, {} }; @@ -129,9 +141,9 @@ static void applet_trace(enum trace_level level, uint64_t mask, const struct tra if (src->verbosity == STRM_VERB_CLEAN) return; - chunk_appendf(&trace_buf, " appctx=%p .t=%p .t.exp=%d .state=%d .st0=%d .st1=%d", + chunk_appendf(&trace_buf, " appctx=%p .t=%p .t.exp=%d .flags=0x%x .st0=%d .st1=%d to_fwd=%lu", appctx, appctx->t, tick_isset(appctx->t->expire) ? TICKS_TO_MS(appctx->t->expire - now_ms) : TICK_ETERNITY, - appctx->state, appctx->st0, appctx->st1); + appctx->flags, appctx->st0, appctx->st1, (ulong)appctx->to_forward); if (!sc || src->verbosity == STRM_VERB_MINIMAL) return; @@ -167,21 +179,41 @@ static void applet_trace(enum trace_level level, uint64_t mask, const struct tra (src->verbosity == STRM_VERB_ADVANCED && src->level < TRACE_LEVEL_DATA)) return; - /* channels' buffer info */ - if (s->flags & SF_HTX) { - struct htx *ichtx = htxbuf(&ic->buf); - struct htx *ochtx = htxbuf(&oc->buf); + if (appctx->t->process == task_run_applet) { + /* channels' buffer info */ + if (s->flags & SF_HTX) { + struct htx *ichtx = htxbuf(&ic->buf); + struct htx *ochtx = htxbuf(&oc->buf); - chunk_appendf(&trace_buf, " htx=(%u/%u#%u, %u/%u#%u)", - ichtx->data, ichtx->size, htx_nbblks(ichtx), - ochtx->data, ochtx->size, htx_nbblks(ochtx)); + chunk_appendf(&trace_buf, " htx=(%u/%u#%u, %u/%u#%u)", + ichtx->data, ichtx->size, htx_nbblks(ichtx), + ochtx->data, ochtx->size, htx_nbblks(ochtx)); + } + else { + chunk_appendf(&trace_buf, " buf=(%u@%p+%u/%u, %u@%p+%u/%u)", + (unsigned int)b_data(&ic->buf), b_orig(&ic->buf), + (unsigned int)b_head_ofs(&ic->buf), (unsigned int)b_size(&ic->buf), + (unsigned int)b_data(&oc->buf), b_orig(&oc->buf), + (unsigned int)b_head_ofs(&oc->buf), (unsigned int)b_size(&oc->buf)); + } } else { - chunk_appendf(&trace_buf, " buf=(%u@%p+%u/%u, %u@%p+%u/%u)", - (unsigned int)b_data(&ic->buf), b_orig(&ic->buf), - (unsigned int)b_head_ofs(&ic->buf), (unsigned int)b_size(&ic->buf), - (unsigned int)b_data(&oc->buf), b_orig(&oc->buf), - (unsigned int)b_head_ofs(&oc->buf), (unsigned int)b_size(&oc->buf)); + /* RX/TX buffer info */ + if (s->flags & SF_HTX) { + struct htx *rxhtx = htxbuf(&appctx->inbuf); + struct htx *txhtx = htxbuf(&appctx->outbuf); + + chunk_appendf(&trace_buf, " htx=(%u/%u#%u, %u/%u#%u)", + rxhtx->data, rxhtx->size, htx_nbblks(rxhtx), + txhtx->data, txhtx->size, htx_nbblks(txhtx)); + } + else { + chunk_appendf(&trace_buf, " buf=(%u@%p+%u/%u, %u@%p+%u/%u)", + (unsigned int)b_data(&appctx->inbuf), b_orig(&appctx->inbuf), + (unsigned int)b_head_ofs(&appctx->inbuf), (unsigned int)b_size(&appctx->inbuf), + (unsigned int)b_data(&appctx->outbuf), b_orig(&appctx->outbuf), + (unsigned int)b_head_ofs(&appctx->outbuf), (unsigned int)b_size(&appctx->outbuf)); + } } } @@ -207,7 +239,7 @@ struct appctx *appctx_new_on(struct applet *applet, struct sedesc *sedesc, int t goto fail_appctx; } - LIST_INIT(&appctx->wait_entry); + MT_LIST_INIT(&appctx->wait_entry); appctx->obj_type = OBJ_TYPE_APPCTX; appctx->applet = applet; appctx->sess = NULL; @@ -229,7 +261,18 @@ struct appctx *appctx_new_on(struct applet *applet, struct sedesc *sedesc, int t } appctx->sedesc = sedesc; - appctx->t->process = task_run_applet; + + appctx->flags = 0; + appctx->inbuf = BUF_NULL; + appctx->outbuf = BUF_NULL; + appctx->to_forward = 0; + + if (applet->rcv_buf != NULL && applet->snd_buf != NULL) { + appctx->t->process = task_process_applet; + applet_fl_set(appctx, APPCTX_FL_INOUT_BUFS); + } + else + appctx->t->process = task_run_applet; appctx->t->context = appctx; LIST_INIT(&appctx->buffer_wait.list); @@ -314,7 +357,7 @@ void appctx_free(struct appctx *appctx) /* if it's running, or about to run, defer the freeing * until the callback is called. */ - appctx->state |= APPLET_WANT_DIE; + applet_fl_set(appctx, APPCTX_FL_WANT_DIE); task_wakeup(appctx->t, TASK_WOKEN_OTHER); TRACE_DEVEL("Cannot release APPCTX now, wake it up", APPLET_EV_FREE, appctx); } @@ -348,55 +391,366 @@ void applet_reset_svcctx(struct appctx *appctx) appctx->svcctx = NULL; } -/* call the applet's release() function if any, and marks the sedesc as shut. - * Needs to be called upon close(). +/* call the applet's release() function if any, and marks the sedesc as shut + * once both read and write side are shut. Needs to be called upon close(). */ void appctx_shut(struct appctx *appctx) { - if (se_fl_test(appctx->sedesc, SE_FL_SHR | SE_FL_SHW)) + if (applet_fl_test(appctx, APPCTX_FL_SHUTDOWN)) return; TRACE_ENTER(APPLET_EV_RELEASE, appctx); + if (appctx->applet->release) appctx->applet->release(appctx); + applet_fl_set(appctx, APPCTX_FL_SHUTDOWN); - if (LIST_INLIST(&appctx->buffer_wait.list)) - LIST_DEL_INIT(&appctx->buffer_wait.list); + b_dequeue(&appctx->buffer_wait); - se_fl_set(appctx->sedesc, SE_FL_SHRR | SE_FL_SHWN); TRACE_LEAVE(APPLET_EV_RELEASE, appctx); } +/* releases unused buffers after processing. It will try to wake up as many + * entities as the number of buffers that it releases. + */ +static void appctx_release_buffers(struct appctx * appctx) +{ + int offer = 0; + + if (b_size(&appctx->inbuf) && !b_data(&appctx->inbuf)) { + offer++; + b_free(&appctx->inbuf); + } + if (b_size(&appctx->outbuf) && !b_data(&appctx->outbuf)) { + offer++; + b_free(&appctx->outbuf); + } + + /* if we're certain to have at least 1 buffer available, and there is + * someone waiting, we can wake up a waiter and offer them. + */ + if (offer) + offer_buffers(appctx, offer); +} + /* Callback used to wake up an applet when a buffer is available. The applet * <appctx> is woken up if an input buffer was requested for the associated - * stream connector. In this case the buffer is immediately allocated and the - * function returns 1. Otherwise it returns 0. Note that this automatically - * covers multiple wake-up attempts by ensuring that the same buffer will not - * be accounted for multiple times. + * stream connector. In this case the buffer is expected to be allocated later, + * the applet is woken up, and the function returns 1 to mention this buffer is + * expected to be used. Otherwise it returns 0. */ int appctx_buf_available(void *arg) { struct appctx *appctx = arg; struct stconn *sc = appctx_sc(appctx); + int ret = 0; + + if (applet_fl_test(appctx, APPCTX_FL_INBLK_ALLOC)) { + applet_fl_clr(appctx, APPCTX_FL_INBLK_ALLOC); + applet_fl_set(appctx, APPCTX_FL_IN_MAYALLOC); + TRACE_STATE("unblocking appctx on inbuf allocation", APPLET_EV_RECV|APPLET_EV_BLK|APPLET_EV_WAKE, appctx); + ret = 1; + } + + if (applet_fl_test(appctx, APPCTX_FL_OUTBLK_ALLOC)) { + applet_fl_clr(appctx, APPCTX_FL_OUTBLK_ALLOC); + applet_fl_set(appctx, APPCTX_FL_OUT_MAYALLOC); + TRACE_STATE("unblocking appctx on outbuf allocation", APPLET_EV_SEND|APPLET_EV_BLK|APPLET_EV_WAKE, appctx); + ret = 1; + } + + /* allocation requested ? if no, give up. */ + if (sc->flags & SC_FL_NEED_BUFF) { + sc_have_buff(sc); + ret = 1; + } + + /* The requested buffer might already have been allocated (channel, + * fast-forward etc), in which case we won't need to take that one. + * Otherwise we expect to take it. + */ + if (!c_size(sc_ic(sc)) && !sc_ep_have_ff_data(sc_opposite(sc))) + ret = 1; + leave: + if (ret) + task_wakeup(appctx->t, TASK_WOKEN_RES); + return ret; +} + +size_t appctx_htx_rcv_buf(struct appctx *appctx, struct buffer *buf, size_t count, unsigned int flags) +{ + struct htx *appctx_htx = htx_from_buf(&appctx->outbuf); + struct htx *buf_htx = NULL; + size_t ret = 0; + + if (htx_is_empty(appctx_htx)) { + htx_to_buf(appctx_htx, &appctx->outbuf); + goto out; + } + + ret = appctx_htx->data; + buf_htx = htx_from_buf(buf); + if (htx_is_empty(buf_htx) && htx_used_space(appctx_htx) <= count) { + htx_to_buf(buf_htx, buf); + htx_to_buf(appctx_htx, &appctx->outbuf); + b_xfer(buf, &appctx->outbuf, b_data(&appctx->outbuf)); + goto out; + } + + htx_xfer_blks(buf_htx, appctx_htx, count, HTX_BLK_UNUSED); + buf_htx->flags |= (appctx_htx->flags & (HTX_FL_PARSING_ERROR|HTX_FL_PROCESSING_ERROR)); + if (htx_is_empty(appctx_htx)) { + buf_htx->flags |= (appctx_htx->flags & HTX_FL_EOM); + } + buf_htx->extra = (appctx_htx->extra ? (appctx_htx->data + appctx_htx->extra) : 0); + htx_to_buf(buf_htx, buf); + htx_to_buf(appctx_htx, &appctx->outbuf); + ret -= appctx_htx->data; + + out: + return ret; +} + +size_t appctx_raw_rcv_buf(struct appctx *appctx, struct buffer *buf, size_t count, unsigned int flags) +{ + return b_xfer(buf, &appctx->outbuf, MIN(count, b_data(&appctx->outbuf))); +} + +size_t appctx_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count, unsigned int flags) +{ + struct appctx *appctx = __sc_appctx(sc); + size_t ret = 0; + + TRACE_ENTER(APPLET_EV_RECV, appctx); + + if (applet_fl_test(appctx, APPCTX_FL_OUTBLK_ALLOC)) + goto end; + + if (!count) + goto end; + + if (!appctx_get_buf(appctx, &appctx->outbuf)) { + TRACE_STATE("waiting for appctx outbuf allocation", APPLET_EV_RECV|APPLET_EV_BLK, appctx); + goto end; + } + + if (flags & CO_RFL_BUF_FLUSH) + applet_fl_set(appctx, APPCTX_FL_FASTFWD); + + ret = appctx->applet->rcv_buf(appctx, buf, count, flags); + if (ret) + applet_fl_clr(appctx, APPCTX_FL_OUTBLK_FULL); + + if (b_data(&appctx->outbuf)) { + se_fl_set(appctx->sedesc, SE_FL_RCV_MORE | SE_FL_WANT_ROOM); + TRACE_STATE("waiting for more room", APPLET_EV_RECV|APPLET_EV_BLK, appctx); + } + else { + se_fl_clr(appctx->sedesc, SE_FL_RCV_MORE | SE_FL_WANT_ROOM); + if (applet_fl_test(appctx, APPCTX_FL_EOI)) { + se_fl_set(appctx->sedesc, SE_FL_EOI); + TRACE_STATE("report EOI to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx); + } + if (applet_fl_test(appctx, APPCTX_FL_EOS)) { + se_fl_set(appctx->sedesc, SE_FL_EOS); + TRACE_STATE("report EOS to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx); + } + if (applet_fl_test(appctx, APPCTX_FL_ERROR)) { + se_fl_set(appctx->sedesc, SE_FL_ERROR); + TRACE_STATE("report ERROR to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx); + } + } + + end: + TRACE_LEAVE(APPLET_EV_RECV, appctx); + return ret; +} + +size_t appctx_htx_snd_buf(struct appctx *appctx, struct buffer *buf, size_t count, unsigned int flags) +{ + struct htx *appctx_htx = htx_from_buf(&appctx->inbuf); + struct htx *buf_htx = htx_from_buf(buf); + size_t ret = 0; + + ret = buf_htx->data; + if (htx_is_empty(appctx_htx) && buf_htx->data == count) { + htx_to_buf(appctx_htx, &appctx->inbuf); + htx_to_buf(buf_htx, buf); + b_xfer(&appctx->inbuf, buf, b_data(buf)); + goto end; + } + + htx_xfer_blks(appctx_htx, buf_htx, count, HTX_BLK_UNUSED); + if (htx_is_empty(buf_htx)) { + appctx_htx->flags |= (buf_htx->flags & HTX_FL_EOM); + } + + appctx_htx->extra = (buf_htx->extra ? (buf_htx->data + buf_htx->extra) : 0); + htx_to_buf(appctx_htx, &appctx->outbuf); + htx_to_buf(buf_htx, buf); + ret -= buf_htx->data; +end: + if (ret < count) { + applet_fl_set(appctx, APPCTX_FL_INBLK_FULL); + TRACE_STATE("report appctx inbuf is full", APPLET_EV_SEND|APPLET_EV_BLK, appctx); + } + return ret; +} + +size_t appctx_raw_snd_buf(struct appctx *appctx, struct buffer *buf, size_t count, unsigned flags) +{ + size_t ret = 0; + + ret = b_xfer(&appctx->inbuf, buf, MIN(b_room(&appctx->inbuf), count)); + if (ret < count) { + applet_fl_set(appctx, APPCTX_FL_INBLK_FULL); + TRACE_STATE("report appctx inbuf is full", APPLET_EV_SEND|APPLET_EV_BLK, appctx); + } + end: + return ret; +} + +size_t appctx_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, unsigned int flags) +{ + struct appctx *appctx = __sc_appctx(sc); + size_t ret = 0; + + TRACE_ENTER(APPLET_EV_SEND, appctx); + + if (applet_fl_test(appctx, (APPCTX_FL_ERROR|APPCTX_FL_ERR_PENDING))) + goto end; + + if (applet_fl_test(appctx, (APPCTX_FL_INBLK_FULL|APPCTX_FL_INBLK_ALLOC))) + goto end; + + if (!count) + goto end; + + if (!appctx_get_buf(appctx, &appctx->inbuf)) { + TRACE_STATE("waiting for appctx inbuf allocation", APPLET_EV_SEND|APPLET_EV_BLK, appctx); + goto end; + } + + ret = appctx->applet->snd_buf(appctx, buf, count, flags); + + end: + if (applet_fl_test(appctx, (APPCTX_FL_ERROR|APPCTX_FL_ERR_PENDING))) { + BUG_ON((applet_fl_get(appctx) & (APPCTX_FL_EOS|APPCTX_FL_ERROR|APPCTX_FL_ERR_PENDING)) == (APPCTX_FL_EOS|APPCTX_FL_ERR_PENDING)); + applet_set_error(appctx); + TRACE_STATE("report ERR_PENDING/ERROR to SE", APPLET_EV_SEND, appctx); + } + TRACE_LEAVE(APPLET_EV_SEND, appctx); + return ret; +} + +int appctx_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags) +{ + struct appctx *appctx = __sc_appctx(sc); + struct xref *peer; + struct sedesc *sdo = NULL; + unsigned int len, nego_flags = NEGO_FF_FL_NONE; + int ret = 0; + + TRACE_ENTER(APPLET_EV_RECV, appctx); + + applet_fl_set(appctx, APPCTX_FL_FASTFWD); + + /* TODO: outbuf must be empty. Find a better way to handle that but for now just return -1 */ + if (b_data(&appctx->outbuf)) { + TRACE_STATE("Output buffer not empty, cannot fast-forward data", APPLET_EV_RECV, appctx); + return -1; + } + + peer = xref_get_peer_and_lock(&appctx->sedesc->xref); + if (!peer) { + TRACE_STATE("Opposite endpoint not available yet", APPLET_EV_RECV, appctx); + goto end; + } + sdo = container_of(peer, struct sedesc, xref); + xref_unlock(&appctx->sedesc->xref, peer); + + if (appctx->to_forward && count > appctx->to_forward) { + count = appctx->to_forward; + nego_flags |= NEGO_FF_FL_EXACT_SIZE; + } - /* allocation requested ? */ - if (!(sc->flags & SC_FL_NEED_BUFF)) - return 0; + len = se_nego_ff(sdo, &BUF_NULL, count, nego_flags); + if (sdo->iobuf.flags & IOBUF_FL_NO_FF) { + sc_ep_clr(sc, SE_FL_MAY_FASTFWD_PROD); + applet_fl_clr(appctx, APPCTX_FL_FASTFWD); + TRACE_DEVEL("Fast-forwarding not supported by opposite endpoint, disable it", APPLET_EV_RECV, appctx); + goto end; + } + if (sdo->iobuf.flags & IOBUF_FL_FF_BLOCKED) { + sc_ep_set(sc, /* SE_FL_RCV_MORE | */SE_FL_WANT_ROOM); + TRACE_STATE("waiting for more room", APPLET_EV_RECV|APPLET_EV_BLK, appctx); + goto end; + } + + b_add(sdo->iobuf.buf, sdo->iobuf.offset); + ret = appctx->applet->fastfwd(appctx, sdo->iobuf.buf, len, 0); + b_sub(sdo->iobuf.buf, sdo->iobuf.offset); + sdo->iobuf.data += ret; + + if (se_fl_test(appctx->sedesc, SE_FL_WANT_ROOM)) { + /* The applet request more room, report the info at the iobuf level */ + sdo->iobuf.flags |= IOBUF_FL_FF_BLOCKED; + TRACE_STATE("waiting for more room", APPLET_EV_RECV|APPLET_EV_BLK, appctx); + } + + if (applet_fl_test(appctx, APPCTX_FL_EOI)) { + se_fl_set(appctx->sedesc, SE_FL_EOI); + sdo->iobuf.flags |= IOBUF_FL_EOI; /* TODO: it may be good to have a flag to be sure we can + * forward the EOI the to consumer side + */ + TRACE_STATE("report EOI to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx); + } + if (applet_fl_test(appctx, APPCTX_FL_EOS)) { + se_fl_set(appctx->sedesc, SE_FL_EOS); + TRACE_STATE("report EOS to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx); + } + if (applet_fl_test(appctx, APPCTX_FL_ERROR)) { + se_fl_set(appctx->sedesc, SE_FL_ERROR); + TRACE_STATE("report ERROR to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx); + } + /* else */ + /* applet_have_more_data(appctx); */ - sc_have_buff(sc); + if (se_done_ff(sdo) != 0) { + /* Something was forwarding, don't reclaim more room */ + se_fl_clr(appctx->sedesc, SE_FL_WANT_ROOM); + TRACE_STATE("more room available", APPLET_EV_RECV|APPLET_EV_BLK, appctx); + } + +end: + TRACE_LEAVE(APPLET_EV_RECV, appctx); + return ret; +} - /* was already allocated another way ? if so, don't take this one */ - if (c_size(sc_ic(sc)) || sc_ep_have_ff_data(sc_opposite(sc))) - return 0; +/* Atomically append a line to applet <ctx>'s output, appending a trailing LF. + * The line is read from vectors <v1> and <v2> at offset <ofs> relative to the + * area's origin, for <len> bytes. It returns the number of bytes consumed from + * the input vectors on success, -1 if it temporarily cannot (buffer full), -2 + * if it will never be able to (too large msg). The vectors are not modified. + * The caller is responsible for making sure that there are at least ofs+len + * bytes in the input vectors. + */ +ssize_t applet_append_line(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len) +{ + struct appctx *appctx = ctx; - /* allocation possible now ? */ - if (!b_alloc(&sc_ic(sc)->buf)) { - sc_need_buff(sc); - return 0; + if (unlikely(len + 1 > b_size(&trash))) { + /* too large a message to ever fit, let's skip it */ + return -2; } - task_wakeup(appctx->t, TASK_WOKEN_RES); - return 1; + chunk_reset(&trash); + vp_peek_ofs(v1, v2, ofs, trash.area, len); + trash.data += len; + trash.area[trash.data++] = '\n'; + if (applet_putchk(appctx, &trash) == -1) + return -1; + return len; } /* Default applet handler */ @@ -404,13 +758,14 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state) { struct appctx *app = context; struct stconn *sc, *sco; + struct channel *ic, *oc; unsigned int rate; - size_t count; + size_t input, output; int did_send = 0; TRACE_ENTER(APPLET_EV_PROCESS, app); - if (app->state & APPLET_WANT_DIE) { + if (applet_fl_test(app, APPCTX_FL_WANT_DIE)) { TRACE_DEVEL("APPCTX want die, release it", APPLET_EV_FREE, app); __appctx_free(app); return NULL; @@ -434,6 +789,9 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state) sc = appctx_sc(app); sco = sc_opposite(sc); + ic = sc_ic(sc); + oc = sc_oc(sc); + /* We always pretend the applet can't get and doesn't want to * put, it's up to it to change this if needed. This ensures * that one applet which ignores any event will not spin. @@ -450,7 +808,10 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state) if (!sc_alloc_ibuf(sc, &app->buffer_wait)) applet_have_more_data(app); - count = co_data(sc_oc(sc)); + channel_check_idletimer(ic); + + input = ic->total; + output = co_data(oc); app->applet->fct(app); TRACE_POINT(APPLET_EV_PROCESS, app); @@ -458,9 +819,9 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state) /* now check if the applet has released some room and forgot to * notify the other side about it. */ - if (count != co_data(sc_oc(sc))) { - sc_oc(sc)->flags |= CF_WRITE_EVENT | CF_WROTE_DATA; - if (sco->room_needed < 0 || channel_recv_max(sc_oc(sc)) >= sco->room_needed) + if (output != co_data(oc)) { + oc->flags |= CF_WRITE_EVENT | CF_WROTE_DATA; + if (sco->room_needed < 0 || channel_recv_max(oc) >= sco->room_needed) sc_have_room(sco); did_send = 1; } @@ -469,14 +830,18 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state) sc_have_room(sco); } - if (sc_ic(sc)->flags & CF_READ_EVENT) + input = ic->total - input; + if (input) { + channel_check_xfer(ic, input); sc_ep_report_read_activity(sc); + } + /* TODO: May be move in appctx_rcv_buf or sc_applet_process ? */ if (sc_waiting_room(sc) && (sc->flags & SC_FL_ABRT_DONE)) { sc_ep_set(sc, SE_FL_EOS|SE_FL_ERROR); } - if (!co_data(sc_oc(sc))) { + if (!co_data(oc)) { if (did_send) sc_ep_report_send_activity(sc); } @@ -495,7 +860,109 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state) } sc->app_ops->wake(sc); - channel_release_buffer(sc_ic(sc), &app->buffer_wait); + channel_release_buffer(ic, &app->buffer_wait); + TRACE_LEAVE(APPLET_EV_PROCESS, app); + return t; +} + + +/* Default applet handler based on IN/OUT buffers. It is a true task here, no a tasklet */ +struct task *task_process_applet(struct task *t, void *context, unsigned int state) +{ + struct appctx *app = context; + struct stconn *sc; + unsigned int rate; + + TRACE_ENTER(APPLET_EV_PROCESS, app); + + if (applet_fl_test(app, APPCTX_FL_WANT_DIE)) { + TRACE_DEVEL("APPCTX want die, release it", APPLET_EV_FREE, app); + __appctx_free(app); + return NULL; + } + + if (se_fl_test(app->sedesc, SE_FL_ORPHAN)) { + /* Finalize init of orphan appctx. .init callback function must + * be defined and it must finalize appctx startup. + */ + BUG_ON(!app->applet->init); + + if (appctx_init(app) == -1) { + TRACE_DEVEL("APPCTX init failed", APPLET_EV_FREE|APPLET_EV_ERR, app); + appctx_free_on_early_error(app); + return NULL; + } + BUG_ON(!app->sess || !appctx_sc(app) || !appctx_strm(app)); + TRACE_DEVEL("APPCTX initialized", APPLET_EV_PROCESS, app); + } + + sc = appctx_sc(app); + + sc_applet_sync_send(sc); + + /* We always pretend the applet can't get and doesn't want to + * put, it's up to it to change this if needed. This ensures + * that one applet which ignores any event will not spin. + */ + applet_need_more_data(app); + applet_have_no_more_data(app); + + app->applet->fct(app); + + TRACE_POINT(APPLET_EV_PROCESS, app); + + if (b_data(&app->outbuf) || se_fl_test(app->sedesc, SE_FL_MAY_FASTFWD_PROD) || + applet_fl_test(app, APPCTX_FL_EOI|APPCTX_FL_EOS|APPCTX_FL_ERROR)) + applet_have_more_data(app); + + sc_applet_sync_recv(sc); + + /* TODO: May be move in appctx_rcv_buf or sc_applet_process ? */ + if (sc_waiting_room(sc) && (sc->flags & SC_FL_ABRT_DONE)) { + sc_ep_set(sc, SE_FL_EOS|SE_FL_ERROR); + } + + /* measure the call rate and check for anomalies when too high */ + if (((b_size(sc_ib(sc)) && sc->flags & SC_FL_NEED_BUFF) || // asks for a buffer which is present + (b_size(sc_ib(sc)) && !b_data(sc_ib(sc)) && sc->flags & SC_FL_NEED_ROOM) || // asks for room in an empty buffer + (b_data(sc_ob(sc)) && sc_is_send_allowed(sc)) || // asks for data already present + (!b_data(sc_ib(sc)) && b_data(sc_ob(sc)) && // didn't return anything ... + (!(sc_oc(sc)->flags & CF_WRITE_EVENT) && (sc->flags & SC_FL_SHUT_WANTED))))) { // ... and left data pending after a shut + rate = update_freq_ctr(&app->call_rate, 1); + if (rate >= 100000 && app->call_rate.prev_ctr) // looped like this more than 100k times over last second + stream_dump_and_crash(&app->obj_type, read_freq_ctr(&app->call_rate)); + } + + sc->app_ops->wake(sc); + appctx_release_buffers(app); TRACE_LEAVE(APPLET_EV_PROCESS, app); return t; } + +/* config parser for global "tune.applet.zero-copy-forwarding" */ +static int cfg_parse_applet_zero_copy_fwd(char **args, int section_type, struct proxy *curpx, + const struct proxy *defpx, const char *file, int line, + char **err) +{ + if (too_many_args(1, args, err, NULL)) + return -1; + + if (strcmp(args[1], "on") == 0) + global.tune.no_zero_copy_fwd &= ~NO_ZERO_COPY_FWD_APPLET; + else if (strcmp(args[1], "off") == 0) + global.tune.no_zero_copy_fwd |= NO_ZERO_COPY_FWD_APPLET; + else { + memprintf(err, "'%s' expects 'on' or 'off'.", args[0]); + return -1; + } + return 0; +} + + +/* config keyword parsers */ +static struct cfg_kw_list cfg_kws = {ILH, { + { CFG_GLOBAL, "tune.applet.zero-copy-forwarding", cfg_parse_applet_zero_copy_fwd }, + { 0, NULL, NULL } +}}; + +INITCALL1(STG_REGISTER, cfg_register_keywords, &cfg_kws); |