summaryrefslogtreecommitdiffstats
path: root/src/applet.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/applet.c')
-rw-r--r--src/applet.c565
1 files changed, 516 insertions, 49 deletions
diff --git a/src/applet.c b/src/applet.c
index b695a9f..c528963 100644
--- a/src/applet.c
+++ b/src/applet.c
@@ -15,13 +15,17 @@
#include <haproxy/api.h>
#include <haproxy/applet.h>
+#include <haproxy/cfgparse.h>
#include <haproxy/channel.h>
+#include <haproxy/htx.h>
#include <haproxy/list.h>
#include <haproxy/sc_strm.h>
#include <haproxy/stconn.h>
#include <haproxy/stream.h>
#include <haproxy/task.h>
#include <haproxy/trace.h>
+#include <haproxy/vecpair.h>
+#include <haproxy/xref.h>
unsigned int nb_applets = 0;
@@ -50,6 +54,14 @@ static const struct trace_event applet_trace_events[] = {
{ .mask = APPLET_EV_ERR, .name = "app_err", .desc = "error on appctx" },
#define APPLET_EV_START (1ULL << 5)
{ .mask = APPLET_EV_START, .name = "app_start", .desc = "start appctx" },
+#define APPLET_EV_RECV (1ULL << 6)
+ { .mask = APPLET_EV_START, .name = "app_receive", .desc = "RX on appctx" },
+#define APPLET_EV_SEND (1ULL << 7)
+ { .mask = APPLET_EV_START, .name = "app_send", .desc = "TX on appctx" },
+#define APPLET_EV_BLK (1ULL << 8)
+ { .mask = APPLET_EV_START, .name = "app_blk", .desc = "appctx blocked" },
+#define APPLET_EV_WAKE (1ULL << 9)
+ { .mask = APPLET_EV_START, .name = "app_wake", .desc = "appctx woken up" },
{}
};
@@ -129,9 +141,9 @@ static void applet_trace(enum trace_level level, uint64_t mask, const struct tra
if (src->verbosity == STRM_VERB_CLEAN)
return;
- chunk_appendf(&trace_buf, " appctx=%p .t=%p .t.exp=%d .state=%d .st0=%d .st1=%d",
+ chunk_appendf(&trace_buf, " appctx=%p .t=%p .t.exp=%d .flags=0x%x .st0=%d .st1=%d to_fwd=%lu",
appctx, appctx->t, tick_isset(appctx->t->expire) ? TICKS_TO_MS(appctx->t->expire - now_ms) : TICK_ETERNITY,
- appctx->state, appctx->st0, appctx->st1);
+ appctx->flags, appctx->st0, appctx->st1, (ulong)appctx->to_forward);
if (!sc || src->verbosity == STRM_VERB_MINIMAL)
return;
@@ -167,21 +179,41 @@ static void applet_trace(enum trace_level level, uint64_t mask, const struct tra
(src->verbosity == STRM_VERB_ADVANCED && src->level < TRACE_LEVEL_DATA))
return;
- /* channels' buffer info */
- if (s->flags & SF_HTX) {
- struct htx *ichtx = htxbuf(&ic->buf);
- struct htx *ochtx = htxbuf(&oc->buf);
+ if (appctx->t->process == task_run_applet) {
+ /* channels' buffer info */
+ if (s->flags & SF_HTX) {
+ struct htx *ichtx = htxbuf(&ic->buf);
+ struct htx *ochtx = htxbuf(&oc->buf);
- chunk_appendf(&trace_buf, " htx=(%u/%u#%u, %u/%u#%u)",
- ichtx->data, ichtx->size, htx_nbblks(ichtx),
- ochtx->data, ochtx->size, htx_nbblks(ochtx));
+ chunk_appendf(&trace_buf, " htx=(%u/%u#%u, %u/%u#%u)",
+ ichtx->data, ichtx->size, htx_nbblks(ichtx),
+ ochtx->data, ochtx->size, htx_nbblks(ochtx));
+ }
+ else {
+ chunk_appendf(&trace_buf, " buf=(%u@%p+%u/%u, %u@%p+%u/%u)",
+ (unsigned int)b_data(&ic->buf), b_orig(&ic->buf),
+ (unsigned int)b_head_ofs(&ic->buf), (unsigned int)b_size(&ic->buf),
+ (unsigned int)b_data(&oc->buf), b_orig(&oc->buf),
+ (unsigned int)b_head_ofs(&oc->buf), (unsigned int)b_size(&oc->buf));
+ }
}
else {
- chunk_appendf(&trace_buf, " buf=(%u@%p+%u/%u, %u@%p+%u/%u)",
- (unsigned int)b_data(&ic->buf), b_orig(&ic->buf),
- (unsigned int)b_head_ofs(&ic->buf), (unsigned int)b_size(&ic->buf),
- (unsigned int)b_data(&oc->buf), b_orig(&oc->buf),
- (unsigned int)b_head_ofs(&oc->buf), (unsigned int)b_size(&oc->buf));
+ /* RX/TX buffer info */
+ if (s->flags & SF_HTX) {
+ struct htx *rxhtx = htxbuf(&appctx->inbuf);
+ struct htx *txhtx = htxbuf(&appctx->outbuf);
+
+ chunk_appendf(&trace_buf, " htx=(%u/%u#%u, %u/%u#%u)",
+ rxhtx->data, rxhtx->size, htx_nbblks(rxhtx),
+ txhtx->data, txhtx->size, htx_nbblks(txhtx));
+ }
+ else {
+ chunk_appendf(&trace_buf, " buf=(%u@%p+%u/%u, %u@%p+%u/%u)",
+ (unsigned int)b_data(&appctx->inbuf), b_orig(&appctx->inbuf),
+ (unsigned int)b_head_ofs(&appctx->inbuf), (unsigned int)b_size(&appctx->inbuf),
+ (unsigned int)b_data(&appctx->outbuf), b_orig(&appctx->outbuf),
+ (unsigned int)b_head_ofs(&appctx->outbuf), (unsigned int)b_size(&appctx->outbuf));
+ }
}
}
@@ -207,7 +239,7 @@ struct appctx *appctx_new_on(struct applet *applet, struct sedesc *sedesc, int t
goto fail_appctx;
}
- LIST_INIT(&appctx->wait_entry);
+ MT_LIST_INIT(&appctx->wait_entry);
appctx->obj_type = OBJ_TYPE_APPCTX;
appctx->applet = applet;
appctx->sess = NULL;
@@ -229,7 +261,18 @@ struct appctx *appctx_new_on(struct applet *applet, struct sedesc *sedesc, int t
}
appctx->sedesc = sedesc;
- appctx->t->process = task_run_applet;
+
+ appctx->flags = 0;
+ appctx->inbuf = BUF_NULL;
+ appctx->outbuf = BUF_NULL;
+ appctx->to_forward = 0;
+
+ if (applet->rcv_buf != NULL && applet->snd_buf != NULL) {
+ appctx->t->process = task_process_applet;
+ applet_fl_set(appctx, APPCTX_FL_INOUT_BUFS);
+ }
+ else
+ appctx->t->process = task_run_applet;
appctx->t->context = appctx;
LIST_INIT(&appctx->buffer_wait.list);
@@ -314,7 +357,7 @@ void appctx_free(struct appctx *appctx)
/* if it's running, or about to run, defer the freeing
* until the callback is called.
*/
- appctx->state |= APPLET_WANT_DIE;
+ applet_fl_set(appctx, APPCTX_FL_WANT_DIE);
task_wakeup(appctx->t, TASK_WOKEN_OTHER);
TRACE_DEVEL("Cannot release APPCTX now, wake it up", APPLET_EV_FREE, appctx);
}
@@ -348,55 +391,366 @@ void applet_reset_svcctx(struct appctx *appctx)
appctx->svcctx = NULL;
}
-/* call the applet's release() function if any, and marks the sedesc as shut.
- * Needs to be called upon close().
+/* call the applet's release() function if any, and marks the sedesc as shut
+ * once both read and write side are shut. Needs to be called upon close().
*/
void appctx_shut(struct appctx *appctx)
{
- if (se_fl_test(appctx->sedesc, SE_FL_SHR | SE_FL_SHW))
+ if (applet_fl_test(appctx, APPCTX_FL_SHUTDOWN))
return;
TRACE_ENTER(APPLET_EV_RELEASE, appctx);
+
if (appctx->applet->release)
appctx->applet->release(appctx);
+ applet_fl_set(appctx, APPCTX_FL_SHUTDOWN);
- if (LIST_INLIST(&appctx->buffer_wait.list))
- LIST_DEL_INIT(&appctx->buffer_wait.list);
+ b_dequeue(&appctx->buffer_wait);
- se_fl_set(appctx->sedesc, SE_FL_SHRR | SE_FL_SHWN);
TRACE_LEAVE(APPLET_EV_RELEASE, appctx);
}
+/* releases unused buffers after processing. It will try to wake up as many
+ * entities as the number of buffers that it releases.
+ */
+static void appctx_release_buffers(struct appctx * appctx)
+{
+ int offer = 0;
+
+ if (b_size(&appctx->inbuf) && !b_data(&appctx->inbuf)) {
+ offer++;
+ b_free(&appctx->inbuf);
+ }
+ if (b_size(&appctx->outbuf) && !b_data(&appctx->outbuf)) {
+ offer++;
+ b_free(&appctx->outbuf);
+ }
+
+ /* if we're certain to have at least 1 buffer available, and there is
+ * someone waiting, we can wake up a waiter and offer them.
+ */
+ if (offer)
+ offer_buffers(appctx, offer);
+}
+
/* Callback used to wake up an applet when a buffer is available. The applet
* <appctx> is woken up if an input buffer was requested for the associated
- * stream connector. In this case the buffer is immediately allocated and the
- * function returns 1. Otherwise it returns 0. Note that this automatically
- * covers multiple wake-up attempts by ensuring that the same buffer will not
- * be accounted for multiple times.
+ * stream connector. In this case the buffer is expected to be allocated later,
+ * the applet is woken up, and the function returns 1 to mention this buffer is
+ * expected to be used. Otherwise it returns 0.
*/
int appctx_buf_available(void *arg)
{
struct appctx *appctx = arg;
struct stconn *sc = appctx_sc(appctx);
+ int ret = 0;
+
+ if (applet_fl_test(appctx, APPCTX_FL_INBLK_ALLOC)) {
+ applet_fl_clr(appctx, APPCTX_FL_INBLK_ALLOC);
+ applet_fl_set(appctx, APPCTX_FL_IN_MAYALLOC);
+ TRACE_STATE("unblocking appctx on inbuf allocation", APPLET_EV_RECV|APPLET_EV_BLK|APPLET_EV_WAKE, appctx);
+ ret = 1;
+ }
+
+ if (applet_fl_test(appctx, APPCTX_FL_OUTBLK_ALLOC)) {
+ applet_fl_clr(appctx, APPCTX_FL_OUTBLK_ALLOC);
+ applet_fl_set(appctx, APPCTX_FL_OUT_MAYALLOC);
+ TRACE_STATE("unblocking appctx on outbuf allocation", APPLET_EV_SEND|APPLET_EV_BLK|APPLET_EV_WAKE, appctx);
+ ret = 1;
+ }
+
+ /* allocation requested ? if no, give up. */
+ if (sc->flags & SC_FL_NEED_BUFF) {
+ sc_have_buff(sc);
+ ret = 1;
+ }
+
+ /* The requested buffer might already have been allocated (channel,
+ * fast-forward etc), in which case we won't need to take that one.
+ * Otherwise we expect to take it.
+ */
+ if (!c_size(sc_ic(sc)) && !sc_ep_have_ff_data(sc_opposite(sc)))
+ ret = 1;
+ leave:
+ if (ret)
+ task_wakeup(appctx->t, TASK_WOKEN_RES);
+ return ret;
+}
+
+size_t appctx_htx_rcv_buf(struct appctx *appctx, struct buffer *buf, size_t count, unsigned int flags)
+{
+ struct htx *appctx_htx = htx_from_buf(&appctx->outbuf);
+ struct htx *buf_htx = NULL;
+ size_t ret = 0;
+
+ if (htx_is_empty(appctx_htx)) {
+ htx_to_buf(appctx_htx, &appctx->outbuf);
+ goto out;
+ }
+
+ ret = appctx_htx->data;
+ buf_htx = htx_from_buf(buf);
+ if (htx_is_empty(buf_htx) && htx_used_space(appctx_htx) <= count) {
+ htx_to_buf(buf_htx, buf);
+ htx_to_buf(appctx_htx, &appctx->outbuf);
+ b_xfer(buf, &appctx->outbuf, b_data(&appctx->outbuf));
+ goto out;
+ }
+
+ htx_xfer_blks(buf_htx, appctx_htx, count, HTX_BLK_UNUSED);
+ buf_htx->flags |= (appctx_htx->flags & (HTX_FL_PARSING_ERROR|HTX_FL_PROCESSING_ERROR));
+ if (htx_is_empty(appctx_htx)) {
+ buf_htx->flags |= (appctx_htx->flags & HTX_FL_EOM);
+ }
+ buf_htx->extra = (appctx_htx->extra ? (appctx_htx->data + appctx_htx->extra) : 0);
+ htx_to_buf(buf_htx, buf);
+ htx_to_buf(appctx_htx, &appctx->outbuf);
+ ret -= appctx_htx->data;
+
+ out:
+ return ret;
+}
+
+size_t appctx_raw_rcv_buf(struct appctx *appctx, struct buffer *buf, size_t count, unsigned int flags)
+{
+ return b_xfer(buf, &appctx->outbuf, MIN(count, b_data(&appctx->outbuf)));
+}
+
+size_t appctx_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count, unsigned int flags)
+{
+ struct appctx *appctx = __sc_appctx(sc);
+ size_t ret = 0;
+
+ TRACE_ENTER(APPLET_EV_RECV, appctx);
+
+ if (applet_fl_test(appctx, APPCTX_FL_OUTBLK_ALLOC))
+ goto end;
+
+ if (!count)
+ goto end;
+
+ if (!appctx_get_buf(appctx, &appctx->outbuf)) {
+ TRACE_STATE("waiting for appctx outbuf allocation", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
+ goto end;
+ }
+
+ if (flags & CO_RFL_BUF_FLUSH)
+ applet_fl_set(appctx, APPCTX_FL_FASTFWD);
+
+ ret = appctx->applet->rcv_buf(appctx, buf, count, flags);
+ if (ret)
+ applet_fl_clr(appctx, APPCTX_FL_OUTBLK_FULL);
+
+ if (b_data(&appctx->outbuf)) {
+ se_fl_set(appctx->sedesc, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
+ TRACE_STATE("waiting for more room", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
+ }
+ else {
+ se_fl_clr(appctx->sedesc, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
+ if (applet_fl_test(appctx, APPCTX_FL_EOI)) {
+ se_fl_set(appctx->sedesc, SE_FL_EOI);
+ TRACE_STATE("report EOI to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
+ }
+ if (applet_fl_test(appctx, APPCTX_FL_EOS)) {
+ se_fl_set(appctx->sedesc, SE_FL_EOS);
+ TRACE_STATE("report EOS to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
+ }
+ if (applet_fl_test(appctx, APPCTX_FL_ERROR)) {
+ se_fl_set(appctx->sedesc, SE_FL_ERROR);
+ TRACE_STATE("report ERROR to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
+ }
+ }
+
+ end:
+ TRACE_LEAVE(APPLET_EV_RECV, appctx);
+ return ret;
+}
+
+size_t appctx_htx_snd_buf(struct appctx *appctx, struct buffer *buf, size_t count, unsigned int flags)
+{
+ struct htx *appctx_htx = htx_from_buf(&appctx->inbuf);
+ struct htx *buf_htx = htx_from_buf(buf);
+ size_t ret = 0;
+
+ ret = buf_htx->data;
+ if (htx_is_empty(appctx_htx) && buf_htx->data == count) {
+ htx_to_buf(appctx_htx, &appctx->inbuf);
+ htx_to_buf(buf_htx, buf);
+ b_xfer(&appctx->inbuf, buf, b_data(buf));
+ goto end;
+ }
+
+ htx_xfer_blks(appctx_htx, buf_htx, count, HTX_BLK_UNUSED);
+ if (htx_is_empty(buf_htx)) {
+ appctx_htx->flags |= (buf_htx->flags & HTX_FL_EOM);
+ }
+
+ appctx_htx->extra = (buf_htx->extra ? (buf_htx->data + buf_htx->extra) : 0);
+ htx_to_buf(appctx_htx, &appctx->outbuf);
+ htx_to_buf(buf_htx, buf);
+ ret -= buf_htx->data;
+end:
+ if (ret < count) {
+ applet_fl_set(appctx, APPCTX_FL_INBLK_FULL);
+ TRACE_STATE("report appctx inbuf is full", APPLET_EV_SEND|APPLET_EV_BLK, appctx);
+ }
+ return ret;
+}
+
+size_t appctx_raw_snd_buf(struct appctx *appctx, struct buffer *buf, size_t count, unsigned flags)
+{
+ size_t ret = 0;
+
+ ret = b_xfer(&appctx->inbuf, buf, MIN(b_room(&appctx->inbuf), count));
+ if (ret < count) {
+ applet_fl_set(appctx, APPCTX_FL_INBLK_FULL);
+ TRACE_STATE("report appctx inbuf is full", APPLET_EV_SEND|APPLET_EV_BLK, appctx);
+ }
+ end:
+ return ret;
+}
+
+size_t appctx_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, unsigned int flags)
+{
+ struct appctx *appctx = __sc_appctx(sc);
+ size_t ret = 0;
+
+ TRACE_ENTER(APPLET_EV_SEND, appctx);
+
+ if (applet_fl_test(appctx, (APPCTX_FL_ERROR|APPCTX_FL_ERR_PENDING)))
+ goto end;
+
+ if (applet_fl_test(appctx, (APPCTX_FL_INBLK_FULL|APPCTX_FL_INBLK_ALLOC)))
+ goto end;
+
+ if (!count)
+ goto end;
+
+ if (!appctx_get_buf(appctx, &appctx->inbuf)) {
+ TRACE_STATE("waiting for appctx inbuf allocation", APPLET_EV_SEND|APPLET_EV_BLK, appctx);
+ goto end;
+ }
+
+ ret = appctx->applet->snd_buf(appctx, buf, count, flags);
+
+ end:
+ if (applet_fl_test(appctx, (APPCTX_FL_ERROR|APPCTX_FL_ERR_PENDING))) {
+ BUG_ON((applet_fl_get(appctx) & (APPCTX_FL_EOS|APPCTX_FL_ERROR|APPCTX_FL_ERR_PENDING)) == (APPCTX_FL_EOS|APPCTX_FL_ERR_PENDING));
+ applet_set_error(appctx);
+ TRACE_STATE("report ERR_PENDING/ERROR to SE", APPLET_EV_SEND, appctx);
+ }
+ TRACE_LEAVE(APPLET_EV_SEND, appctx);
+ return ret;
+}
+
+int appctx_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)
+{
+ struct appctx *appctx = __sc_appctx(sc);
+ struct xref *peer;
+ struct sedesc *sdo = NULL;
+ unsigned int len, nego_flags = NEGO_FF_FL_NONE;
+ int ret = 0;
+
+ TRACE_ENTER(APPLET_EV_RECV, appctx);
+
+ applet_fl_set(appctx, APPCTX_FL_FASTFWD);
+
+ /* TODO: outbuf must be empty. Find a better way to handle that but for now just return -1 */
+ if (b_data(&appctx->outbuf)) {
+ TRACE_STATE("Output buffer not empty, cannot fast-forward data", APPLET_EV_RECV, appctx);
+ return -1;
+ }
+
+ peer = xref_get_peer_and_lock(&appctx->sedesc->xref);
+ if (!peer) {
+ TRACE_STATE("Opposite endpoint not available yet", APPLET_EV_RECV, appctx);
+ goto end;
+ }
+ sdo = container_of(peer, struct sedesc, xref);
+ xref_unlock(&appctx->sedesc->xref, peer);
+
+ if (appctx->to_forward && count > appctx->to_forward) {
+ count = appctx->to_forward;
+ nego_flags |= NEGO_FF_FL_EXACT_SIZE;
+ }
- /* allocation requested ? */
- if (!(sc->flags & SC_FL_NEED_BUFF))
- return 0;
+ len = se_nego_ff(sdo, &BUF_NULL, count, nego_flags);
+ if (sdo->iobuf.flags & IOBUF_FL_NO_FF) {
+ sc_ep_clr(sc, SE_FL_MAY_FASTFWD_PROD);
+ applet_fl_clr(appctx, APPCTX_FL_FASTFWD);
+ TRACE_DEVEL("Fast-forwarding not supported by opposite endpoint, disable it", APPLET_EV_RECV, appctx);
+ goto end;
+ }
+ if (sdo->iobuf.flags & IOBUF_FL_FF_BLOCKED) {
+ sc_ep_set(sc, /* SE_FL_RCV_MORE | */SE_FL_WANT_ROOM);
+ TRACE_STATE("waiting for more room", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
+ goto end;
+ }
+
+ b_add(sdo->iobuf.buf, sdo->iobuf.offset);
+ ret = appctx->applet->fastfwd(appctx, sdo->iobuf.buf, len, 0);
+ b_sub(sdo->iobuf.buf, sdo->iobuf.offset);
+ sdo->iobuf.data += ret;
+
+ if (se_fl_test(appctx->sedesc, SE_FL_WANT_ROOM)) {
+ /* The applet request more room, report the info at the iobuf level */
+ sdo->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
+ TRACE_STATE("waiting for more room", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
+ }
+
+ if (applet_fl_test(appctx, APPCTX_FL_EOI)) {
+ se_fl_set(appctx->sedesc, SE_FL_EOI);
+ sdo->iobuf.flags |= IOBUF_FL_EOI; /* TODO: it may be good to have a flag to be sure we can
+ * forward the EOI the to consumer side
+ */
+ TRACE_STATE("report EOI to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
+ }
+ if (applet_fl_test(appctx, APPCTX_FL_EOS)) {
+ se_fl_set(appctx->sedesc, SE_FL_EOS);
+ TRACE_STATE("report EOS to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
+ }
+ if (applet_fl_test(appctx, APPCTX_FL_ERROR)) {
+ se_fl_set(appctx->sedesc, SE_FL_ERROR);
+ TRACE_STATE("report ERROR to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
+ }
+ /* else */
+ /* applet_have_more_data(appctx); */
- sc_have_buff(sc);
+ if (se_done_ff(sdo) != 0) {
+ /* Something was forwarding, don't reclaim more room */
+ se_fl_clr(appctx->sedesc, SE_FL_WANT_ROOM);
+ TRACE_STATE("more room available", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
+ }
+
+end:
+ TRACE_LEAVE(APPLET_EV_RECV, appctx);
+ return ret;
+}
- /* was already allocated another way ? if so, don't take this one */
- if (c_size(sc_ic(sc)) || sc_ep_have_ff_data(sc_opposite(sc)))
- return 0;
+/* Atomically append a line to applet <ctx>'s output, appending a trailing LF.
+ * The line is read from vectors <v1> and <v2> at offset <ofs> relative to the
+ * area's origin, for <len> bytes. It returns the number of bytes consumed from
+ * the input vectors on success, -1 if it temporarily cannot (buffer full), -2
+ * if it will never be able to (too large msg). The vectors are not modified.
+ * The caller is responsible for making sure that there are at least ofs+len
+ * bytes in the input vectors.
+ */
+ssize_t applet_append_line(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len)
+{
+ struct appctx *appctx = ctx;
- /* allocation possible now ? */
- if (!b_alloc(&sc_ic(sc)->buf)) {
- sc_need_buff(sc);
- return 0;
+ if (unlikely(len + 1 > b_size(&trash))) {
+ /* too large a message to ever fit, let's skip it */
+ return -2;
}
- task_wakeup(appctx->t, TASK_WOKEN_RES);
- return 1;
+ chunk_reset(&trash);
+ vp_peek_ofs(v1, v2, ofs, trash.area, len);
+ trash.data += len;
+ trash.area[trash.data++] = '\n';
+ if (applet_putchk(appctx, &trash) == -1)
+ return -1;
+ return len;
}
/* Default applet handler */
@@ -404,13 +758,14 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
{
struct appctx *app = context;
struct stconn *sc, *sco;
+ struct channel *ic, *oc;
unsigned int rate;
- size_t count;
+ size_t input, output;
int did_send = 0;
TRACE_ENTER(APPLET_EV_PROCESS, app);
- if (app->state & APPLET_WANT_DIE) {
+ if (applet_fl_test(app, APPCTX_FL_WANT_DIE)) {
TRACE_DEVEL("APPCTX want die, release it", APPLET_EV_FREE, app);
__appctx_free(app);
return NULL;
@@ -434,6 +789,9 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
sc = appctx_sc(app);
sco = sc_opposite(sc);
+ ic = sc_ic(sc);
+ oc = sc_oc(sc);
+
/* We always pretend the applet can't get and doesn't want to
* put, it's up to it to change this if needed. This ensures
* that one applet which ignores any event will not spin.
@@ -450,7 +808,10 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
if (!sc_alloc_ibuf(sc, &app->buffer_wait))
applet_have_more_data(app);
- count = co_data(sc_oc(sc));
+ channel_check_idletimer(ic);
+
+ input = ic->total;
+ output = co_data(oc);
app->applet->fct(app);
TRACE_POINT(APPLET_EV_PROCESS, app);
@@ -458,9 +819,9 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
/* now check if the applet has released some room and forgot to
* notify the other side about it.
*/
- if (count != co_data(sc_oc(sc))) {
- sc_oc(sc)->flags |= CF_WRITE_EVENT | CF_WROTE_DATA;
- if (sco->room_needed < 0 || channel_recv_max(sc_oc(sc)) >= sco->room_needed)
+ if (output != co_data(oc)) {
+ oc->flags |= CF_WRITE_EVENT | CF_WROTE_DATA;
+ if (sco->room_needed < 0 || channel_recv_max(oc) >= sco->room_needed)
sc_have_room(sco);
did_send = 1;
}
@@ -469,14 +830,18 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
sc_have_room(sco);
}
- if (sc_ic(sc)->flags & CF_READ_EVENT)
+ input = ic->total - input;
+ if (input) {
+ channel_check_xfer(ic, input);
sc_ep_report_read_activity(sc);
+ }
+ /* TODO: May be move in appctx_rcv_buf or sc_applet_process ? */
if (sc_waiting_room(sc) && (sc->flags & SC_FL_ABRT_DONE)) {
sc_ep_set(sc, SE_FL_EOS|SE_FL_ERROR);
}
- if (!co_data(sc_oc(sc))) {
+ if (!co_data(oc)) {
if (did_send)
sc_ep_report_send_activity(sc);
}
@@ -495,7 +860,109 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
}
sc->app_ops->wake(sc);
- channel_release_buffer(sc_ic(sc), &app->buffer_wait);
+ channel_release_buffer(ic, &app->buffer_wait);
+ TRACE_LEAVE(APPLET_EV_PROCESS, app);
+ return t;
+}
+
+
+/* Default applet handler based on IN/OUT buffers. It is a true task here, no a tasklet */
+struct task *task_process_applet(struct task *t, void *context, unsigned int state)
+{
+ struct appctx *app = context;
+ struct stconn *sc;
+ unsigned int rate;
+
+ TRACE_ENTER(APPLET_EV_PROCESS, app);
+
+ if (applet_fl_test(app, APPCTX_FL_WANT_DIE)) {
+ TRACE_DEVEL("APPCTX want die, release it", APPLET_EV_FREE, app);
+ __appctx_free(app);
+ return NULL;
+ }
+
+ if (se_fl_test(app->sedesc, SE_FL_ORPHAN)) {
+ /* Finalize init of orphan appctx. .init callback function must
+ * be defined and it must finalize appctx startup.
+ */
+ BUG_ON(!app->applet->init);
+
+ if (appctx_init(app) == -1) {
+ TRACE_DEVEL("APPCTX init failed", APPLET_EV_FREE|APPLET_EV_ERR, app);
+ appctx_free_on_early_error(app);
+ return NULL;
+ }
+ BUG_ON(!app->sess || !appctx_sc(app) || !appctx_strm(app));
+ TRACE_DEVEL("APPCTX initialized", APPLET_EV_PROCESS, app);
+ }
+
+ sc = appctx_sc(app);
+
+ sc_applet_sync_send(sc);
+
+ /* We always pretend the applet can't get and doesn't want to
+ * put, it's up to it to change this if needed. This ensures
+ * that one applet which ignores any event will not spin.
+ */
+ applet_need_more_data(app);
+ applet_have_no_more_data(app);
+
+ app->applet->fct(app);
+
+ TRACE_POINT(APPLET_EV_PROCESS, app);
+
+ if (b_data(&app->outbuf) || se_fl_test(app->sedesc, SE_FL_MAY_FASTFWD_PROD) ||
+ applet_fl_test(app, APPCTX_FL_EOI|APPCTX_FL_EOS|APPCTX_FL_ERROR))
+ applet_have_more_data(app);
+
+ sc_applet_sync_recv(sc);
+
+ /* TODO: May be move in appctx_rcv_buf or sc_applet_process ? */
+ if (sc_waiting_room(sc) && (sc->flags & SC_FL_ABRT_DONE)) {
+ sc_ep_set(sc, SE_FL_EOS|SE_FL_ERROR);
+ }
+
+ /* measure the call rate and check for anomalies when too high */
+ if (((b_size(sc_ib(sc)) && sc->flags & SC_FL_NEED_BUFF) || // asks for a buffer which is present
+ (b_size(sc_ib(sc)) && !b_data(sc_ib(sc)) && sc->flags & SC_FL_NEED_ROOM) || // asks for room in an empty buffer
+ (b_data(sc_ob(sc)) && sc_is_send_allowed(sc)) || // asks for data already present
+ (!b_data(sc_ib(sc)) && b_data(sc_ob(sc)) && // didn't return anything ...
+ (!(sc_oc(sc)->flags & CF_WRITE_EVENT) && (sc->flags & SC_FL_SHUT_WANTED))))) { // ... and left data pending after a shut
+ rate = update_freq_ctr(&app->call_rate, 1);
+ if (rate >= 100000 && app->call_rate.prev_ctr) // looped like this more than 100k times over last second
+ stream_dump_and_crash(&app->obj_type, read_freq_ctr(&app->call_rate));
+ }
+
+ sc->app_ops->wake(sc);
+ appctx_release_buffers(app);
TRACE_LEAVE(APPLET_EV_PROCESS, app);
return t;
}
+
+/* config parser for global "tune.applet.zero-copy-forwarding" */
+static int cfg_parse_applet_zero_copy_fwd(char **args, int section_type, struct proxy *curpx,
+ const struct proxy *defpx, const char *file, int line,
+ char **err)
+{
+ if (too_many_args(1, args, err, NULL))
+ return -1;
+
+ if (strcmp(args[1], "on") == 0)
+ global.tune.no_zero_copy_fwd &= ~NO_ZERO_COPY_FWD_APPLET;
+ else if (strcmp(args[1], "off") == 0)
+ global.tune.no_zero_copy_fwd |= NO_ZERO_COPY_FWD_APPLET;
+ else {
+ memprintf(err, "'%s' expects 'on' or 'off'.", args[0]);
+ return -1;
+ }
+ return 0;
+}
+
+
+/* config keyword parsers */
+static struct cfg_kw_list cfg_kws = {ILH, {
+ { CFG_GLOBAL, "tune.applet.zero-copy-forwarding", cfg_parse_applet_zero_copy_fwd },
+ { 0, NULL, NULL }
+}};
+
+INITCALL1(STG_REGISTER, cfg_register_keywords, &cfg_kws);