summaryrefslogtreecommitdiffstats
path: root/src/stconn.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-03 05:11:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-03 05:11:10 +0000
commitcff6d757e3ba609c08ef2aaa00f07e53551e5bf6 (patch)
tree08c4fc3255483ad397d712edb4214ded49149fd9 /src/stconn.c
parentAdding upstream version 2.9.7. (diff)
downloadhaproxy-upstream/3.0.0.tar.xz
haproxy-upstream/3.0.0.zip
Adding upstream version 3.0.0.upstream/3.0.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/stconn.c')
-rw-r--r--src/stconn.c614
1 files changed, 517 insertions, 97 deletions
diff --git a/src/stconn.c b/src/stconn.c
index df119a1..6077403 100644
--- a/src/stconn.c
+++ b/src/stconn.c
@@ -14,6 +14,7 @@
#include <haproxy/applet.h>
#include <haproxy/connection.h>
#include <haproxy/check.h>
+#include <haproxy/filters.h>
#include <haproxy/http_ana.h>
#include <haproxy/pipe.h>
#include <haproxy/pool.h>
@@ -99,6 +100,9 @@ void sedesc_init(struct sedesc *sedesc)
sedesc->xref.peer = NULL;
se_fl_setall(sedesc, SE_FL_NONE);
+ sedesc->abort_info.info = 0;
+ sedesc->abort_info.code = 0;
+
sedesc->iobuf.pipe = NULL;
sedesc->iobuf.buf = NULL;
sedesc->iobuf.offset = sedesc->iobuf.data = 0;
@@ -130,6 +134,54 @@ void sedesc_free(struct sedesc *sedesc)
}
}
+/* Performs a shutdown on the endpoint. This function deals with connection and
+ * applet endpoints. It is responsible to set SE flags corresponding to the
+ * given shut modes and to call right shutdown functions of the endpoint. It is
+ * called from the .abort and .shut app_ops callback functions at the SC level.
+ */
+void se_shutdown(struct sedesc *sedesc, enum se_shut_mode mode)
+{
+ if (se_fl_test(sedesc, SE_FL_T_MUX)) {
+ const struct mux_ops *mux = (sedesc->conn ? sedesc->conn->mux : NULL);
+ unsigned int flags = 0;
+
+ if ((mode & (SE_SHW_SILENT|SE_SHW_NORMAL)) && !se_fl_test(sedesc, SE_FL_SHW))
+ flags |= (mode & SE_SHW_NORMAL) ? SE_FL_SHWN : SE_FL_SHWS;
+
+
+ if ((mode & (SE_SHR_RESET|SE_SHR_DRAIN)) && !se_fl_test(sedesc, SE_FL_SHR))
+ flags |= (mode & SE_SHR_DRAIN) ? SE_FL_SHRD : SE_FL_SHRR;
+
+ if (flags) {
+ if (mux && mux->shut) {
+ struct se_abort_info *reason = NULL;
+ struct xref *peer = xref_get_peer_and_lock(&sedesc->xref);
+
+ if (peer) {
+ struct sedesc *sdo = container_of(peer, struct sedesc, xref);
+
+ reason = &sdo->abort_info;
+ xref_unlock(&sedesc->xref, peer);
+ }
+
+ mux->shut(sedesc->sc, mode, reason);
+
+ }
+ se_fl_set(sedesc, flags);
+ }
+ }
+ else if (se_fl_test(sedesc, SE_FL_T_APPLET)) {
+ if ((mode & (SE_SHW_SILENT|SE_SHW_NORMAL)) && !se_fl_test(sedesc, SE_FL_SHW))
+ se_fl_set(sedesc, SE_FL_SHWN);
+
+ if ((mode & (SE_SHR_RESET|SE_SHR_DRAIN)) && !se_fl_test(sedesc, SE_FL_SHR))
+ se_fl_set(sedesc, SE_FL_SHRR);
+
+ if (se_fl_test(sedesc, SE_FL_SHR) && se_fl_test(sedesc, SE_FL_SHW))
+ appctx_shut(sedesc->se);
+ }
+}
+
/* Tries to allocate a new stconn and initialize its main fields. On
* failure, nothing is allocated and NULL is returned. It is an internal
* function. The caller must, at least, set the SE_FL_ORPHAN or SE_FL_DETACHED
@@ -312,15 +364,17 @@ int sc_attach_mux(struct stconn *sc, void *sd, void *ctx)
* removed. This function is called by a stream when a backend applet is
* registered.
*/
-static void sc_attach_applet(struct stconn *sc, void *sd)
+static int sc_attach_applet(struct stconn *sc, struct appctx *appctx)
{
- sc->sedesc->se = sd;
+ sc->sedesc->se = appctx;
sc_ep_set(sc, SE_FL_T_APPLET);
sc_ep_clr(sc, SE_FL_DETACHED);
if (sc_strm(sc)) {
sc->app_ops = &sc_app_applet_ops;
xref_create(&sc->sedesc->xref, &sc_opposite(sc)->sedesc->xref);
}
+
+ return 0;
}
/* Attaches a stconn to a app layer and sets the relevant
@@ -402,7 +456,7 @@ static void sc_detach_endp(struct stconn **scp)
sc_ep_set(sc, SE_FL_ORPHAN);
sc->sedesc->sc = NULL;
sc->sedesc = NULL;
- appctx_shut(appctx);
+ se_shutdown(appctx->sedesc, SE_SHR_RESET|SE_SHW_NORMAL);
appctx_free(appctx);
}
@@ -506,7 +560,10 @@ struct appctx *sc_applet_create(struct stconn *sc, struct applet *app)
appctx = appctx_new_here(app, sc->sedesc);
if (!appctx)
return NULL;
- sc_attach_applet(sc, appctx);
+ if (sc_attach_applet(sc, appctx) == -1) {
+ appctx_free_on_early_error(appctx);
+ return NULL;
+ }
appctx->t->nice = __sc_strm(sc)->task->nice;
applet_need_more_data(appctx);
appctx_wakeup(appctx);
@@ -612,21 +669,24 @@ static void sc_app_shut(struct stconn *sc)
!(ic->flags & CF_DONT_READ))
return;
- __fallthrough;
+ sc->state = SC_ST_DIS;
+ break;
case SC_ST_CON:
case SC_ST_CER:
case SC_ST_QUE:
case SC_ST_TAR:
/* Note that none of these states may happen with applets */
sc->state = SC_ST_DIS;
- __fallthrough;
+ break;
default:
- sc->flags &= ~SC_FL_NOLINGER;
- sc->flags |= SC_FL_ABRT_DONE;
- if (sc->flags & SC_FL_ISBACK)
- __sc_strm(sc)->conn_exp = TICK_ETERNITY;
+ break;
}
+ sc->flags &= ~SC_FL_NOLINGER;
+ sc->flags |= SC_FL_ABRT_DONE;
+ if (sc->flags & SC_FL_ISBACK)
+ __sc_strm(sc)->conn_exp = TICK_ETERNITY;
+
/* note that if the task exists, it must unregister itself once it runs */
if (!(sc->flags & SC_FL_DONT_WAKE))
task_wakeup(sc_strm_task(sc), TASK_WOKEN_IO);
@@ -691,7 +751,7 @@ static void sc_app_abort_conn(struct stconn *sc)
return;
if (sc->flags & SC_FL_SHUT_DONE) {
- sc_conn_shut(sc);
+ se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_SILENT);
sc->state = SC_ST_DIS;
if (sc->flags & SC_FL_ISBACK)
__sc_strm(sc)->conn_exp = TICK_ETERNITY;
@@ -725,51 +785,42 @@ static void sc_app_shut_conn(struct stconn *sc)
switch (sc->state) {
case SC_ST_RDY:
case SC_ST_EST:
+
/* we have to shut before closing, otherwise some short messages
* may never leave the system, especially when there are remaining
* unread data in the socket input buffer, or when nolinger is set.
* However, if SC_FL_NOLINGER is explicitly set, we know there is
* no risk so we close both sides immediately.
*/
- if (sc->flags & SC_FL_NOLINGER) {
- /* unclean data-layer shutdown, typically an aborted request
- * or a forwarded shutdown from a client to a server due to
- * option abortonclose. No need for the TLS layer to try to
- * emit a shutdown message.
- */
- sc_conn_shutw(sc, CO_SHW_SILENT);
+ if (!(sc->flags & (SC_FL_NOLINGER|SC_FL_EOS|SC_FL_ABRT_DONE)) && !(ic->flags & CF_DONT_READ)) {
+ se_shutdown(sc->sedesc, SE_SHW_NORMAL);
+ return;
}
- else {
- /* clean data-layer shutdown. This only happens on the
- * frontend side, or on the backend side when forwarding
- * a client close in TCP mode or in HTTP TUNNEL mode
- * while option abortonclose is set. We want the TLS
- * layer to try to signal it to the peer before we close.
- */
- sc_conn_shutw(sc, CO_SHW_NORMAL);
- if (!(sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && !(ic->flags & CF_DONT_READ))
- return;
- }
+ se_shutdown(sc->sedesc, SE_SHR_RESET|((sc->flags & SC_FL_NOLINGER) ? SE_SHW_SILENT : SE_SHW_NORMAL));
+ sc->state = SC_ST_DIS;
+ break;
- __fallthrough;
case SC_ST_CON:
/* we may have to close a pending connection, and mark the
* response buffer as abort
*/
- sc_conn_shut(sc);
- __fallthrough;
+ se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_SILENT);
+ sc->state = SC_ST_DIS;
+ break;
case SC_ST_CER:
case SC_ST_QUE:
case SC_ST_TAR:
sc->state = SC_ST_DIS;
- __fallthrough;
+ break;
default:
- sc->flags &= ~SC_FL_NOLINGER;
- sc->flags |= SC_FL_ABRT_DONE;
- if (sc->flags & SC_FL_ISBACK)
- __sc_strm(sc)->conn_exp = TICK_ETERNITY;
+ break;
}
+
+ sc->flags &= ~SC_FL_NOLINGER;
+ sc->flags |= SC_FL_ABRT_DONE;
+ if (sc->flags & SC_FL_ISBACK)
+ __sc_strm(sc)->conn_exp = TICK_ETERNITY;
}
/* This function is used for inter-stream connector calls. It is called by the
@@ -884,7 +935,7 @@ static void sc_app_abort_applet(struct stconn *sc)
return;
if (sc->flags & SC_FL_SHUT_DONE) {
- appctx_shut(__sc_appctx(sc));
+ se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_NORMAL);
sc->state = SC_ST_DIS;
if (sc->flags & SC_FL_ISBACK)
__sc_strm(sc)->conn_exp = TICK_ETERNITY;
@@ -920,6 +971,7 @@ static void sc_app_shut_applet(struct stconn *sc)
switch (sc->state) {
case SC_ST_RDY:
case SC_ST_EST:
+
/* we have to shut before closing, otherwise some short messages
* may never leave the system, especially when there are remaining
* unread data in the socket input buffer, or when nolinger is set.
@@ -927,24 +979,31 @@ static void sc_app_shut_applet(struct stconn *sc)
* no risk so we close both sides immediately.
*/
if (!(sc->flags & (SC_FL_ERROR|SC_FL_NOLINGER|SC_FL_EOS|SC_FL_ABRT_DONE)) &&
- !(ic->flags & CF_DONT_READ))
+ !(ic->flags & CF_DONT_READ)) {
+ se_shutdown(sc->sedesc, SE_SHW_NORMAL);
return;
+ }
+
+ se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_NORMAL);
+ sc->state = SC_ST_DIS;
+ break;
- __fallthrough;
case SC_ST_CON:
case SC_ST_CER:
case SC_ST_QUE:
case SC_ST_TAR:
/* Note that none of these states may happen with applets */
- appctx_shut(__sc_appctx(sc));
+ se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_NORMAL);
sc->state = SC_ST_DIS;
- __fallthrough;
+ break;
default:
- sc->flags &= ~SC_FL_NOLINGER;
- sc->flags |= SC_FL_ABRT_DONE;
- if (sc->flags & SC_FL_ISBACK)
- __sc_strm(sc)->conn_exp = TICK_ETERNITY;
+ break;
}
+
+ sc->flags &= ~SC_FL_NOLINGER;
+ sc->flags |= SC_FL_ABRT_DONE;
+ if (sc->flags & SC_FL_ISBACK)
+ __sc_strm(sc)->conn_exp = TICK_ETERNITY;
}
/* chk_rcv function for applets */
@@ -1095,6 +1154,7 @@ void sc_notify(struct stconn *sc)
*/
if (sc_ep_have_ff_data(sc_opposite(sc)) ||
(co_data(ic) && sc_ep_test(sco, SE_FL_WAIT_DATA) &&
+ (!HAS_DATA_FILTERS(__sc_strm(sc), ic) || channel_input_data(ic) == 0) &&
(!(sc->flags & SC_FL_SND_EXP_MORE) || channel_full(ic, co_data(ic)) || channel_input_data(ic) == 0))) {
int new_len, last_len;
@@ -1185,7 +1245,6 @@ static void sc_conn_eos(struct stconn *sc)
if (sc_cond_forward_shut(sc)) {
/* we want to immediately forward this close to the write side */
/* force flag on ssl to keep stream in cache */
- sc_conn_shutw(sc, CO_SHW_SILENT);
goto do_close;
}
@@ -1194,7 +1253,7 @@ static void sc_conn_eos(struct stconn *sc)
do_close:
/* OK we completely close the socket here just as if we went through sc_shut[rw]() */
- sc_conn_shut(sc);
+ se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_SILENT);
sc->flags &= ~SC_FL_SHUT_WANTED;
sc->flags |= SC_FL_SHUT_DONE;
@@ -1253,17 +1312,7 @@ int sc_conn_recv(struct stconn *sc)
/* prepare to detect if the mux needs more room */
sc_ep_clr(sc, SE_FL_WANT_ROOM);
- if ((ic->flags & (CF_STREAMER | CF_STREAMER_FAST)) && !co_data(ic) &&
- global.tune.idle_timer &&
- (unsigned short)(now_ms - ic->last_read) >= global.tune.idle_timer) {
- /* The buffer was empty and nothing was transferred for more
- * than one second. This was caused by a pause and not by
- * congestion. Reset any streaming mode to reduce latency.
- */
- ic->xfer_small = 0;
- ic->xfer_large = 0;
- ic->flags &= ~(CF_STREAMER | CF_STREAMER_FAST);
- }
+ channel_check_idletimer(ic);
#if defined(USE_LINUX_SPLICE)
/* Detect if the splicing is possible depending on the stream policy */
@@ -1448,41 +1497,7 @@ int sc_conn_recv(struct stconn *sc)
if (!cur_read)
se_have_no_more_data(sc->sedesc);
else {
- if ((ic->flags & (CF_STREAMER | CF_STREAMER_FAST)) &&
- (cur_read <= ic->buf.size / 2)) {
- ic->xfer_large = 0;
- ic->xfer_small++;
- if (ic->xfer_small >= 3) {
- /* we have read less than half of the buffer in
- * one pass, and this happened at least 3 times.
- * This is definitely not a streamer.
- */
- ic->flags &= ~(CF_STREAMER | CF_STREAMER_FAST);
- }
- else if (ic->xfer_small >= 2) {
- /* if the buffer has been at least half full twice,
- * we receive faster than we send, so at least it
- * is not a "fast streamer".
- */
- ic->flags &= ~CF_STREAMER_FAST;
- }
- }
- else if (!(ic->flags & CF_STREAMER_FAST) && (cur_read >= channel_data_limit(ic))) {
- /* we read a full buffer at once */
- ic->xfer_small = 0;
- ic->xfer_large++;
- if (ic->xfer_large >= 3) {
- /* we call this buffer a fast streamer if it manages
- * to be filled in one call 3 consecutive times.
- */
- ic->flags |= (CF_STREAMER | CF_STREAMER_FAST);
- }
- }
- else {
- ic->xfer_small = 0;
- ic->xfer_large = 0;
- }
- ic->last_read = now_ms;
+ channel_check_xfer(ic, cur_read);
sc_ep_report_read_activity(sc);
}
@@ -1660,7 +1675,7 @@ int sc_conn_send(struct stconn *sc)
if (s->txn->req.msg_state != HTTP_MSG_DONE)
s->txn->flags &= ~TX_L7_RETRY;
else {
- if (b_alloc(&s->txn->l7_buffer) == NULL)
+ if (b_alloc(&s->txn->l7_buffer, DB_UNLIKELY) == NULL)
s->txn->flags &= ~TX_L7_RETRY;
else {
memcpy(b_orig(&s->txn->l7_buffer),
@@ -1673,6 +1688,9 @@ int sc_conn_send(struct stconn *sc)
}
}
+ if ((sc->flags & SC_FL_SHUT_WANTED) && co_data(oc) == c_data(oc))
+ send_flag |= CO_SFL_LAST_DATA;
+
ret = conn->mux->snd_buf(sc, &oc->buf, co_data(oc), send_flag);
if (ret > 0) {
did_send = 1;
@@ -1899,7 +1917,7 @@ static void sc_applet_eos(struct stconn *sc)
return;
if (sc->flags & SC_FL_SHUT_DONE) {
- appctx_shut(__sc_appctx(sc));
+ se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_NORMAL);
sc->state = SC_ST_DIS;
if (sc->flags & SC_FL_ISBACK)
__sc_strm(sc)->conn_exp = TICK_ETERNITY;
@@ -1908,6 +1926,352 @@ static void sc_applet_eos(struct stconn *sc)
return sc_app_shut_applet(sc);
}
+/*
+ * This is the callback which is called by the applet layer to receive data into
+ * the buffer from the appctx. It iterates over the applet's rcv_buf
+ * function. Please do not statify this function, it's often present in
+ * backtraces, it's useful to recognize it.
+ */
+int sc_applet_recv(struct stconn *sc)
+{
+ struct appctx *appctx = __sc_appctx(sc);
+ struct channel *ic = sc_ic(sc);
+ int ret, max, cur_read = 0;
+ int read_poll = MAX_READ_POLL_LOOPS;
+ int flags = 0;
+
+
+ /* If another call to sc_applet_recv() failed, give up now.
+ */
+ if (sc_waiting_room(sc))
+ return 0;
+
+ /* maybe we were called immediately after an asynchronous abort */
+ if (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE))
+ return 1;
+
+ /* We must wait because the applet is not fully initialized */
+ if (se_fl_test(sc->sedesc, SE_FL_ORPHAN))
+ return 0;
+
+ /* stop immediately on errors. */
+ if (!sc_ep_test(sc, SE_FL_RCV_MORE)) {
+ // TODO: be sure SE_FL_RCV_MORE may be set for applet ?
+ if (sc_ep_test(sc, SE_FL_ERROR))
+ goto end_recv;
+ }
+
+ /* prepare to detect if the mux needs more room */
+ sc_ep_clr(sc, SE_FL_WANT_ROOM);
+
+ channel_check_idletimer(ic);
+
+ /* First, let's see if we may fast-forward data from a side to the other
+ * one without using the channel buffer.
+ */
+ if (sc_is_fastfwd_supported(sc)) {
+ if (channel_data(ic)) {
+ /* We're embarrassed, there are already data pending in
+ * the buffer and we don't want to have them at two
+ * locations at a time. Let's indicate we need some
+ * place and ask the consumer to hurry.
+ */
+ flags |= CO_RFL_BUF_FLUSH;
+ goto abort_fastfwd;
+ }
+ ret = appctx_fastfwd(sc, ic->to_forward, flags);
+ if (ret < 0)
+ goto abort_fastfwd;
+ else if (ret > 0) {
+ if (ic->to_forward != CHN_INFINITE_FORWARD)
+ ic->to_forward -= ret;
+ ic->total += ret;
+ cur_read += ret;
+ ic->flags |= CF_READ_EVENT;
+ }
+
+ if (sc_ep_test(sc, SE_FL_EOS | SE_FL_ERROR))
+ goto end_recv;
+
+ if (sc_ep_test(sc, SE_FL_WANT_ROOM))
+ sc_need_room(sc, -1);
+
+ if (sc_ep_test(sc, SE_FL_MAY_FASTFWD_PROD) && ic->to_forward)
+ goto done_recv;
+ }
+
+ abort_fastfwd:
+ if (!sc_alloc_ibuf(sc, &appctx->buffer_wait))
+ goto end_recv;
+
+ /* For an HTX stream, if the buffer is stuck (no output data with some
+ * input data) and if the HTX message is fragmented or if its free space
+ * wraps, we force an HTX deframentation. It is a way to have a
+ * contiguous free space nad to let the mux to copy as much data as
+ * possible.
+ *
+ * NOTE: A possible optim may be to let the mux decides if defrag is
+ * required or not, depending on amount of data to be xferred.
+ */
+ if (IS_HTX_STRM(__sc_strm(sc)) && !co_data(ic)) {
+ struct htx *htx = htxbuf(&ic->buf);
+
+ if (htx_is_not_empty(htx) && ((htx->flags & HTX_FL_FRAGMENTED) || htx_space_wraps(htx)))
+ htx_defrag(htx, NULL, 0);
+ }
+
+ /* Compute transient CO_RFL_* flags */
+ if (co_data(ic)) {
+ flags |= (CO_RFL_BUF_WET | CO_RFL_BUF_NOT_STUCK);
+ }
+
+ /* <max> may be null. This is the mux responsibility to set
+ * SE_FL_RCV_MORE on the SC if more space is needed.
+ */
+ max = channel_recv_max(ic);
+ ret = appctx_rcv_buf(sc, &ic->buf, max, flags);
+ if (sc_ep_test(sc, SE_FL_WANT_ROOM)) {
+ /* SE_FL_WANT_ROOM must not be reported if the channel's
+ * buffer is empty.
+ */
+ BUG_ON(c_empty(ic));
+
+ sc_need_room(sc, channel_recv_max(ic) + 1);
+ /* Add READ_PARTIAL because some data are pending but
+ * cannot be xferred to the channel
+ */
+ ic->flags |= CF_READ_EVENT;
+ sc_ep_report_read_activity(sc);
+ }
+
+ if (ret <= 0) {
+ /* if we refrained from reading because we asked for a flush to
+ * satisfy rcv_pipe(), report that there's not enough room here
+ * to proceed.
+ */
+ if (flags & CO_RFL_BUF_FLUSH)
+ sc_need_room(sc, -1);
+ goto done_recv;
+ }
+
+ cur_read += ret;
+
+ /* if we're allowed to directly forward data, we must update ->o */
+ if (ic->to_forward && !(sc_opposite(sc)->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED))) {
+ unsigned long fwd = ret;
+ if (ic->to_forward != CHN_INFINITE_FORWARD) {
+ if (fwd > ic->to_forward)
+ fwd = ic->to_forward;
+ ic->to_forward -= fwd;
+ }
+ c_adv(ic, fwd);
+ }
+
+ ic->flags |= CF_READ_EVENT;
+ ic->total += ret;
+
+ /* End-of-input reached, we can leave. In this case, it is
+ * important to break the loop to not block the SC because of
+ * the channel's policies.This way, we are still able to receive
+ * shutdowns.
+ */
+ if (sc_ep_test(sc, SE_FL_EOI))
+ goto done_recv;
+
+ if ((sc->flags & SC_FL_RCV_ONCE) || --read_poll <= 0) {
+ /* we don't expect to read more data */
+ sc_wont_read(sc);
+ goto done_recv;
+ }
+
+ /* if too many bytes were missing from last read, it means that
+ * it's pointless trying to read again because the system does
+ * not have them in buffers.
+ */
+ if (ret < max) {
+ /* if a streamer has read few data, it may be because we
+ * have exhausted system buffers. It's not worth trying
+ * again.
+ */
+ if (ic->flags & CF_STREAMER) {
+ /* we're stopped by the channel's policy */
+ sc_wont_read(sc);
+ goto done_recv;
+ }
+
+ /* if we read a large block smaller than what we requested,
+ * it's almost certain we'll never get anything more.
+ */
+ if (ret >= global.tune.recv_enough) {
+ /* we're stopped by the channel's policy */
+ sc_wont_read(sc);
+ }
+ }
+
+ done_recv:
+ if (cur_read) {
+ channel_check_xfer(ic, cur_read);
+ sc_ep_report_read_activity(sc);
+ }
+
+ end_recv:
+ ret = (cur_read != 0);
+
+ /* Report EOI on the channel if it was reached from the mux point of
+ * view. */
+ if (sc_ep_test(sc, SE_FL_EOI) && !(sc->flags & SC_FL_EOI)) {
+ sc_ep_report_read_activity(sc);
+ sc->flags |= SC_FL_EOI;
+ ic->flags |= CF_READ_EVENT;
+ ret = 1;
+ }
+
+ if (sc_ep_test(sc, SE_FL_EOS)) {
+ /* we received a shutdown */
+ if (ic->flags & CF_AUTO_CLOSE)
+ sc_schedule_shutdown(sc_opposite(sc));
+ sc_applet_eos(sc);
+ ret = 1;
+ }
+
+ if (sc_ep_test(sc, SE_FL_ERROR)) {
+ sc->flags |= SC_FL_ERROR;
+ ret = 1;
+ }
+ else if (cur_read || (sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM))) {
+ se_have_more_data(sc->sedesc);
+ ret = 1;
+ }
+
+ return ret;
+}
+
+/* This tries to perform a synchronous receive on the stream connector to
+ * try to collect last arrived data. In practice it's only implemented on
+ * stconns. Returns 0 if nothing was done, non-zero if new data or a
+ * shutdown were collected. This may result on some delayed receive calls
+ * to be programmed and performed later, though it doesn't provide any
+ * such guarantee.
+ */
+int sc_applet_sync_recv(struct stconn *sc)
+{
+ if (!(__sc_appctx(sc)->flags & APPCTX_FL_INOUT_BUFS))
+ return 0;
+
+ if (!sc_state_in(sc->state, SC_SB_RDY|SC_SB_EST))
+ return 0;
+
+ if (se_fl_test(sc->sedesc, SE_FL_ORPHAN))
+ return 0;
+
+ if (!sc_is_recv_allowed(sc))
+ return 0; // already failed
+
+ return sc_applet_recv(sc);
+}
+
+/*
+ * This function is called to send buffer data to an applet. It calls the
+ * applet's snd_buf function. Please do not statify this function, it's often
+ * present in backtraces, it's useful to recognize it.
+ */
+int sc_applet_send(struct stconn *sc)
+{
+ struct stconn *sco = sc_opposite(sc);
+ struct channel *oc = sc_oc(sc);
+ size_t ret;
+ int did_send = 0;
+
+ if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) {
+ BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING));
+ return 1;
+ }
+
+ if (sc_ep_test(sc, SE_FL_WONT_CONSUME))
+ return 0;
+
+ /* we might have been called just after an asynchronous shutw */
+ if (sc->flags & SC_FL_SHUT_DONE)
+ return 1;
+
+ /* We must wait because the applet is not fully initialized */
+ if (se_fl_test(sc->sedesc, SE_FL_ORPHAN))
+ return 0;
+
+ /* TODO: Splicing is not supported, so it is not possible to have FF data stuck into the I/O buf */
+ BUG_ON(sc_ep_have_ff_data(sc));
+
+ if (co_data(oc)) {
+ unsigned int send_flag = 0;
+
+ if ((sc->flags & SC_FL_SHUT_WANTED) && co_data(oc) == c_data(oc))
+ send_flag |= CO_SFL_LAST_DATA;
+
+ ret = appctx_snd_buf(sc, &oc->buf, co_data(oc), send_flag);
+ if (ret > 0) {
+ did_send = 1;
+ c_rew(oc, ret);
+ c_realign_if_empty(oc);
+
+ if (!co_data(oc)) {
+ /* Always clear both flags once everything has been sent, they're one-shot */
+ sc->flags &= ~(SC_FL_SND_ASAP|SC_FL_SND_EXP_MORE);
+ }
+ /* if some data remain in the buffer, it's only because the
+ * system buffers are full, we will try next time.
+ */
+ }
+ }
+
+ if (did_send)
+ oc->flags |= CF_WRITE_EVENT | CF_WROTE_DATA;
+
+ if (!sco->room_needed || (did_send && (sco->room_needed < 0 || channel_recv_max(sc_oc(sc)) >= sco->room_needed)))
+ sc_have_room(sco);
+
+ if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) {
+ oc->flags |= CF_WRITE_EVENT;
+ BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING));
+ if (sc_ep_test(sc, SE_FL_ERROR))
+ sc->flags |= SC_FL_ERROR;
+ return 1;
+ }
+
+ if (!co_data(oc)) {
+ if (did_send)
+ sc_ep_report_send_activity(sc);
+ }
+ else {
+ sc_ep_report_blocked_send(sc, did_send);
+ }
+
+ return did_send;
+}
+
+void sc_applet_sync_send(struct stconn *sc)
+{
+ struct channel *oc = sc_oc(sc);
+
+ oc->flags &= ~CF_WRITE_EVENT;
+
+ if (!(__sc_appctx(sc)->flags & APPCTX_FL_INOUT_BUFS))
+ return;
+
+ if (sc->flags & SC_FL_SHUT_DONE)
+ return;
+
+ if (!co_data(oc))
+ return;
+
+ if (!sc_state_in(sc->state, SC_SB_EST))
+ return;
+
+ if (se_fl_test(sc->sedesc, SE_FL_ORPHAN))
+ return;
+
+ sc_applet_send(sc);
+}
+
/* Callback to be used by applet handlers upon completion. It updates the stream
* (which may or may not take this opportunity to try to forward data), then
* may re-enable the applet's based on the channels and stream connector's final
@@ -1960,7 +2324,8 @@ int sc_applet_process(struct stconn *sc)
* appctx but in the case the task is not in runqueue we may have to
* wakeup the appctx immediately.
*/
- if (sc_is_recv_allowed(sc) || sc_is_send_allowed(sc))
+ if ((sc_is_recv_allowed(sc) && !applet_fl_test(__sc_appctx(sc), APPCTX_FL_OUTBLK_ALLOC)) ||
+ (sc_is_send_allowed(sc) && !applet_fl_test(__sc_appctx(sc), APPCTX_FL_INBLK_ALLOC)))
appctx_wakeup(__sc_appctx(sc));
return 0;
}
@@ -2036,6 +2401,57 @@ smp_fetch_sid(const struct arg *args, struct sample *smp, const char *kw, void *
return 1;
}
+/* return 1 if the frontend or backend mux stream has received an abort and 0 otherwise.
+ */
+static int
+smp_fetch_strm_aborted(const struct arg *args, struct sample *smp, const char *kw, void *private)
+{
+ struct stconn *sc;
+ unsigned int aborted = 0;
+
+ if (!smp->strm)
+ return 0;
+
+ sc = (kw[0] == 'f' ? smp->strm->scf : smp->strm->scb);
+ if (sc->sedesc->abort_info.info)
+ aborted = 1;
+
+ smp->flags = SMP_F_VOL_TXN;
+ smp->data.type = SMP_T_BOOL;
+ smp->data.u.sint = aborted;
+
+ return 1;
+}
+
+/* return the H2/QUIC RESET code of the frontend or backend mux stream. Any value
+ * means an a RST_STREAM was received on H2 and a STOP_SENDING on QUIC. Otherwise the sample fetch fails.
+ */
+static int
+smp_fetch_strm_rst_code(const struct arg *args, struct sample *smp, const char *kw, void *private)
+{
+ struct stconn *sc;
+ unsigned int source;
+ unsigned long long code = 0;
+
+ if (!smp->strm)
+ return 0;
+
+ sc = (kw[0] == 'f' ? smp->strm->scf : smp->strm->scb);
+ source = ((sc->sedesc->abort_info.info & SE_ABRT_SRC_MASK) >> SE_ABRT_SRC_SHIFT);
+ if (source != SE_ABRT_SRC_MUX_H2 && source != SE_ABRT_SRC_MUX_QUIC) {
+ if (!source)
+ smp->flags |= SMP_F_MAY_CHANGE;
+ return 0;
+ }
+ code = sc->sedesc->abort_info.code;
+
+ smp->flags = SMP_F_VOL_TXN;
+ smp->data.type = SMP_T_SINT;
+ smp->data.u.sint = code;
+
+ return 1;
+}
+
/* Note: must not be declared <const> as its list will be overwritten.
* Note: fetches that may return multiple types should be declared using the
* appropriate pseudo-type. If not available it must be declared as the lowest
@@ -2043,7 +2459,11 @@ smp_fetch_sid(const struct arg *args, struct sample *smp, const char *kw, void *
*/
static struct sample_fetch_kw_list sample_fetch_keywords = {ILH, {
{ "bs.id", smp_fetch_sid, 0, NULL, SMP_T_SINT, SMP_USE_L6REQ },
+ { "bs.aborted", smp_fetch_strm_aborted, 0, NULL, SMP_T_SINT, SMP_USE_L5SRV },
+ { "bs.rst_code", smp_fetch_strm_rst_code, 0, NULL, SMP_T_SINT, SMP_USE_L5SRV },
{ "fs.id", smp_fetch_sid, 0, NULL, SMP_T_STR, SMP_USE_L6RES },
+ { "fs.aborted", smp_fetch_strm_aborted, 0, NULL, SMP_T_SINT, SMP_USE_L5CLI },
+ { "fs.rst_code", smp_fetch_strm_rst_code, 0, NULL, SMP_T_SINT, SMP_USE_L5CLI },
{ /* END */ },
}};