From cff6d757e3ba609c08ef2aaa00f07e53551e5bf6 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 3 Jun 2024 07:11:10 +0200 Subject: Adding upstream version 3.0.0. Signed-off-by: Daniel Baumann --- src/stconn.c | 614 +++++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 517 insertions(+), 97 deletions(-) (limited to 'src/stconn.c') diff --git a/src/stconn.c b/src/stconn.c index df119a1..6077403 100644 --- a/src/stconn.c +++ b/src/stconn.c @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -99,6 +100,9 @@ void sedesc_init(struct sedesc *sedesc) sedesc->xref.peer = NULL; se_fl_setall(sedesc, SE_FL_NONE); + sedesc->abort_info.info = 0; + sedesc->abort_info.code = 0; + sedesc->iobuf.pipe = NULL; sedesc->iobuf.buf = NULL; sedesc->iobuf.offset = sedesc->iobuf.data = 0; @@ -130,6 +134,54 @@ void sedesc_free(struct sedesc *sedesc) } } +/* Performs a shutdown on the endpoint. This function deals with connection and + * applet endpoints. It is responsible to set SE flags corresponding to the + * given shut modes and to call right shutdown functions of the endpoint. It is + * called from the .abort and .shut app_ops callback functions at the SC level. + */ +void se_shutdown(struct sedesc *sedesc, enum se_shut_mode mode) +{ + if (se_fl_test(sedesc, SE_FL_T_MUX)) { + const struct mux_ops *mux = (sedesc->conn ? sedesc->conn->mux : NULL); + unsigned int flags = 0; + + if ((mode & (SE_SHW_SILENT|SE_SHW_NORMAL)) && !se_fl_test(sedesc, SE_FL_SHW)) + flags |= (mode & SE_SHW_NORMAL) ? SE_FL_SHWN : SE_FL_SHWS; + + + if ((mode & (SE_SHR_RESET|SE_SHR_DRAIN)) && !se_fl_test(sedesc, SE_FL_SHR)) + flags |= (mode & SE_SHR_DRAIN) ? SE_FL_SHRD : SE_FL_SHRR; + + if (flags) { + if (mux && mux->shut) { + struct se_abort_info *reason = NULL; + struct xref *peer = xref_get_peer_and_lock(&sedesc->xref); + + if (peer) { + struct sedesc *sdo = container_of(peer, struct sedesc, xref); + + reason = &sdo->abort_info; + xref_unlock(&sedesc->xref, peer); + } + + mux->shut(sedesc->sc, mode, reason); + + } + se_fl_set(sedesc, flags); + } + } + else if (se_fl_test(sedesc, SE_FL_T_APPLET)) { + if ((mode & (SE_SHW_SILENT|SE_SHW_NORMAL)) && !se_fl_test(sedesc, SE_FL_SHW)) + se_fl_set(sedesc, SE_FL_SHWN); + + if ((mode & (SE_SHR_RESET|SE_SHR_DRAIN)) && !se_fl_test(sedesc, SE_FL_SHR)) + se_fl_set(sedesc, SE_FL_SHRR); + + if (se_fl_test(sedesc, SE_FL_SHR) && se_fl_test(sedesc, SE_FL_SHW)) + appctx_shut(sedesc->se); + } +} + /* Tries to allocate a new stconn and initialize its main fields. On * failure, nothing is allocated and NULL is returned. It is an internal * function. The caller must, at least, set the SE_FL_ORPHAN or SE_FL_DETACHED @@ -312,15 +364,17 @@ int sc_attach_mux(struct stconn *sc, void *sd, void *ctx) * removed. This function is called by a stream when a backend applet is * registered. */ -static void sc_attach_applet(struct stconn *sc, void *sd) +static int sc_attach_applet(struct stconn *sc, struct appctx *appctx) { - sc->sedesc->se = sd; + sc->sedesc->se = appctx; sc_ep_set(sc, SE_FL_T_APPLET); sc_ep_clr(sc, SE_FL_DETACHED); if (sc_strm(sc)) { sc->app_ops = &sc_app_applet_ops; xref_create(&sc->sedesc->xref, &sc_opposite(sc)->sedesc->xref); } + + return 0; } /* Attaches a stconn to a app layer and sets the relevant @@ -402,7 +456,7 @@ static void sc_detach_endp(struct stconn **scp) sc_ep_set(sc, SE_FL_ORPHAN); sc->sedesc->sc = NULL; sc->sedesc = NULL; - appctx_shut(appctx); + se_shutdown(appctx->sedesc, SE_SHR_RESET|SE_SHW_NORMAL); appctx_free(appctx); } @@ -506,7 +560,10 @@ struct appctx *sc_applet_create(struct stconn *sc, struct applet *app) appctx = appctx_new_here(app, sc->sedesc); if (!appctx) return NULL; - sc_attach_applet(sc, appctx); + if (sc_attach_applet(sc, appctx) == -1) { + appctx_free_on_early_error(appctx); + return NULL; + } appctx->t->nice = __sc_strm(sc)->task->nice; applet_need_more_data(appctx); appctx_wakeup(appctx); @@ -612,21 +669,24 @@ static void sc_app_shut(struct stconn *sc) !(ic->flags & CF_DONT_READ)) return; - __fallthrough; + sc->state = SC_ST_DIS; + break; case SC_ST_CON: case SC_ST_CER: case SC_ST_QUE: case SC_ST_TAR: /* Note that none of these states may happen with applets */ sc->state = SC_ST_DIS; - __fallthrough; + break; default: - sc->flags &= ~SC_FL_NOLINGER; - sc->flags |= SC_FL_ABRT_DONE; - if (sc->flags & SC_FL_ISBACK) - __sc_strm(sc)->conn_exp = TICK_ETERNITY; + break; } + sc->flags &= ~SC_FL_NOLINGER; + sc->flags |= SC_FL_ABRT_DONE; + if (sc->flags & SC_FL_ISBACK) + __sc_strm(sc)->conn_exp = TICK_ETERNITY; + /* note that if the task exists, it must unregister itself once it runs */ if (!(sc->flags & SC_FL_DONT_WAKE)) task_wakeup(sc_strm_task(sc), TASK_WOKEN_IO); @@ -691,7 +751,7 @@ static void sc_app_abort_conn(struct stconn *sc) return; if (sc->flags & SC_FL_SHUT_DONE) { - sc_conn_shut(sc); + se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_SILENT); sc->state = SC_ST_DIS; if (sc->flags & SC_FL_ISBACK) __sc_strm(sc)->conn_exp = TICK_ETERNITY; @@ -725,51 +785,42 @@ static void sc_app_shut_conn(struct stconn *sc) switch (sc->state) { case SC_ST_RDY: case SC_ST_EST: + /* we have to shut before closing, otherwise some short messages * may never leave the system, especially when there are remaining * unread data in the socket input buffer, or when nolinger is set. * However, if SC_FL_NOLINGER is explicitly set, we know there is * no risk so we close both sides immediately. */ - if (sc->flags & SC_FL_NOLINGER) { - /* unclean data-layer shutdown, typically an aborted request - * or a forwarded shutdown from a client to a server due to - * option abortonclose. No need for the TLS layer to try to - * emit a shutdown message. - */ - sc_conn_shutw(sc, CO_SHW_SILENT); + if (!(sc->flags & (SC_FL_NOLINGER|SC_FL_EOS|SC_FL_ABRT_DONE)) && !(ic->flags & CF_DONT_READ)) { + se_shutdown(sc->sedesc, SE_SHW_NORMAL); + return; } - else { - /* clean data-layer shutdown. This only happens on the - * frontend side, or on the backend side when forwarding - * a client close in TCP mode or in HTTP TUNNEL mode - * while option abortonclose is set. We want the TLS - * layer to try to signal it to the peer before we close. - */ - sc_conn_shutw(sc, CO_SHW_NORMAL); - if (!(sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && !(ic->flags & CF_DONT_READ)) - return; - } + se_shutdown(sc->sedesc, SE_SHR_RESET|((sc->flags & SC_FL_NOLINGER) ? SE_SHW_SILENT : SE_SHW_NORMAL)); + sc->state = SC_ST_DIS; + break; - __fallthrough; case SC_ST_CON: /* we may have to close a pending connection, and mark the * response buffer as abort */ - sc_conn_shut(sc); - __fallthrough; + se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_SILENT); + sc->state = SC_ST_DIS; + break; case SC_ST_CER: case SC_ST_QUE: case SC_ST_TAR: sc->state = SC_ST_DIS; - __fallthrough; + break; default: - sc->flags &= ~SC_FL_NOLINGER; - sc->flags |= SC_FL_ABRT_DONE; - if (sc->flags & SC_FL_ISBACK) - __sc_strm(sc)->conn_exp = TICK_ETERNITY; + break; } + + sc->flags &= ~SC_FL_NOLINGER; + sc->flags |= SC_FL_ABRT_DONE; + if (sc->flags & SC_FL_ISBACK) + __sc_strm(sc)->conn_exp = TICK_ETERNITY; } /* This function is used for inter-stream connector calls. It is called by the @@ -884,7 +935,7 @@ static void sc_app_abort_applet(struct stconn *sc) return; if (sc->flags & SC_FL_SHUT_DONE) { - appctx_shut(__sc_appctx(sc)); + se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_NORMAL); sc->state = SC_ST_DIS; if (sc->flags & SC_FL_ISBACK) __sc_strm(sc)->conn_exp = TICK_ETERNITY; @@ -920,6 +971,7 @@ static void sc_app_shut_applet(struct stconn *sc) switch (sc->state) { case SC_ST_RDY: case SC_ST_EST: + /* we have to shut before closing, otherwise some short messages * may never leave the system, especially when there are remaining * unread data in the socket input buffer, or when nolinger is set. @@ -927,24 +979,31 @@ static void sc_app_shut_applet(struct stconn *sc) * no risk so we close both sides immediately. */ if (!(sc->flags & (SC_FL_ERROR|SC_FL_NOLINGER|SC_FL_EOS|SC_FL_ABRT_DONE)) && - !(ic->flags & CF_DONT_READ)) + !(ic->flags & CF_DONT_READ)) { + se_shutdown(sc->sedesc, SE_SHW_NORMAL); return; + } + + se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_NORMAL); + sc->state = SC_ST_DIS; + break; - __fallthrough; case SC_ST_CON: case SC_ST_CER: case SC_ST_QUE: case SC_ST_TAR: /* Note that none of these states may happen with applets */ - appctx_shut(__sc_appctx(sc)); + se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_NORMAL); sc->state = SC_ST_DIS; - __fallthrough; + break; default: - sc->flags &= ~SC_FL_NOLINGER; - sc->flags |= SC_FL_ABRT_DONE; - if (sc->flags & SC_FL_ISBACK) - __sc_strm(sc)->conn_exp = TICK_ETERNITY; + break; } + + sc->flags &= ~SC_FL_NOLINGER; + sc->flags |= SC_FL_ABRT_DONE; + if (sc->flags & SC_FL_ISBACK) + __sc_strm(sc)->conn_exp = TICK_ETERNITY; } /* chk_rcv function for applets */ @@ -1095,6 +1154,7 @@ void sc_notify(struct stconn *sc) */ if (sc_ep_have_ff_data(sc_opposite(sc)) || (co_data(ic) && sc_ep_test(sco, SE_FL_WAIT_DATA) && + (!HAS_DATA_FILTERS(__sc_strm(sc), ic) || channel_input_data(ic) == 0) && (!(sc->flags & SC_FL_SND_EXP_MORE) || channel_full(ic, co_data(ic)) || channel_input_data(ic) == 0))) { int new_len, last_len; @@ -1185,7 +1245,6 @@ static void sc_conn_eos(struct stconn *sc) if (sc_cond_forward_shut(sc)) { /* we want to immediately forward this close to the write side */ /* force flag on ssl to keep stream in cache */ - sc_conn_shutw(sc, CO_SHW_SILENT); goto do_close; } @@ -1194,7 +1253,7 @@ static void sc_conn_eos(struct stconn *sc) do_close: /* OK we completely close the socket here just as if we went through sc_shut[rw]() */ - sc_conn_shut(sc); + se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_SILENT); sc->flags &= ~SC_FL_SHUT_WANTED; sc->flags |= SC_FL_SHUT_DONE; @@ -1253,17 +1312,7 @@ int sc_conn_recv(struct stconn *sc) /* prepare to detect if the mux needs more room */ sc_ep_clr(sc, SE_FL_WANT_ROOM); - if ((ic->flags & (CF_STREAMER | CF_STREAMER_FAST)) && !co_data(ic) && - global.tune.idle_timer && - (unsigned short)(now_ms - ic->last_read) >= global.tune.idle_timer) { - /* The buffer was empty and nothing was transferred for more - * than one second. This was caused by a pause and not by - * congestion. Reset any streaming mode to reduce latency. - */ - ic->xfer_small = 0; - ic->xfer_large = 0; - ic->flags &= ~(CF_STREAMER | CF_STREAMER_FAST); - } + channel_check_idletimer(ic); #if defined(USE_LINUX_SPLICE) /* Detect if the splicing is possible depending on the stream policy */ @@ -1448,41 +1497,7 @@ int sc_conn_recv(struct stconn *sc) if (!cur_read) se_have_no_more_data(sc->sedesc); else { - if ((ic->flags & (CF_STREAMER | CF_STREAMER_FAST)) && - (cur_read <= ic->buf.size / 2)) { - ic->xfer_large = 0; - ic->xfer_small++; - if (ic->xfer_small >= 3) { - /* we have read less than half of the buffer in - * one pass, and this happened at least 3 times. - * This is definitely not a streamer. - */ - ic->flags &= ~(CF_STREAMER | CF_STREAMER_FAST); - } - else if (ic->xfer_small >= 2) { - /* if the buffer has been at least half full twice, - * we receive faster than we send, so at least it - * is not a "fast streamer". - */ - ic->flags &= ~CF_STREAMER_FAST; - } - } - else if (!(ic->flags & CF_STREAMER_FAST) && (cur_read >= channel_data_limit(ic))) { - /* we read a full buffer at once */ - ic->xfer_small = 0; - ic->xfer_large++; - if (ic->xfer_large >= 3) { - /* we call this buffer a fast streamer if it manages - * to be filled in one call 3 consecutive times. - */ - ic->flags |= (CF_STREAMER | CF_STREAMER_FAST); - } - } - else { - ic->xfer_small = 0; - ic->xfer_large = 0; - } - ic->last_read = now_ms; + channel_check_xfer(ic, cur_read); sc_ep_report_read_activity(sc); } @@ -1660,7 +1675,7 @@ int sc_conn_send(struct stconn *sc) if (s->txn->req.msg_state != HTTP_MSG_DONE) s->txn->flags &= ~TX_L7_RETRY; else { - if (b_alloc(&s->txn->l7_buffer) == NULL) + if (b_alloc(&s->txn->l7_buffer, DB_UNLIKELY) == NULL) s->txn->flags &= ~TX_L7_RETRY; else { memcpy(b_orig(&s->txn->l7_buffer), @@ -1673,6 +1688,9 @@ int sc_conn_send(struct stconn *sc) } } + if ((sc->flags & SC_FL_SHUT_WANTED) && co_data(oc) == c_data(oc)) + send_flag |= CO_SFL_LAST_DATA; + ret = conn->mux->snd_buf(sc, &oc->buf, co_data(oc), send_flag); if (ret > 0) { did_send = 1; @@ -1899,7 +1917,7 @@ static void sc_applet_eos(struct stconn *sc) return; if (sc->flags & SC_FL_SHUT_DONE) { - appctx_shut(__sc_appctx(sc)); + se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_NORMAL); sc->state = SC_ST_DIS; if (sc->flags & SC_FL_ISBACK) __sc_strm(sc)->conn_exp = TICK_ETERNITY; @@ -1908,6 +1926,352 @@ static void sc_applet_eos(struct stconn *sc) return sc_app_shut_applet(sc); } +/* + * This is the callback which is called by the applet layer to receive data into + * the buffer from the appctx. It iterates over the applet's rcv_buf + * function. Please do not statify this function, it's often present in + * backtraces, it's useful to recognize it. + */ +int sc_applet_recv(struct stconn *sc) +{ + struct appctx *appctx = __sc_appctx(sc); + struct channel *ic = sc_ic(sc); + int ret, max, cur_read = 0; + int read_poll = MAX_READ_POLL_LOOPS; + int flags = 0; + + + /* If another call to sc_applet_recv() failed, give up now. + */ + if (sc_waiting_room(sc)) + return 0; + + /* maybe we were called immediately after an asynchronous abort */ + if (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) + return 1; + + /* We must wait because the applet is not fully initialized */ + if (se_fl_test(sc->sedesc, SE_FL_ORPHAN)) + return 0; + + /* stop immediately on errors. */ + if (!sc_ep_test(sc, SE_FL_RCV_MORE)) { + // TODO: be sure SE_FL_RCV_MORE may be set for applet ? + if (sc_ep_test(sc, SE_FL_ERROR)) + goto end_recv; + } + + /* prepare to detect if the mux needs more room */ + sc_ep_clr(sc, SE_FL_WANT_ROOM); + + channel_check_idletimer(ic); + + /* First, let's see if we may fast-forward data from a side to the other + * one without using the channel buffer. + */ + if (sc_is_fastfwd_supported(sc)) { + if (channel_data(ic)) { + /* We're embarrassed, there are already data pending in + * the buffer and we don't want to have them at two + * locations at a time. Let's indicate we need some + * place and ask the consumer to hurry. + */ + flags |= CO_RFL_BUF_FLUSH; + goto abort_fastfwd; + } + ret = appctx_fastfwd(sc, ic->to_forward, flags); + if (ret < 0) + goto abort_fastfwd; + else if (ret > 0) { + if (ic->to_forward != CHN_INFINITE_FORWARD) + ic->to_forward -= ret; + ic->total += ret; + cur_read += ret; + ic->flags |= CF_READ_EVENT; + } + + if (sc_ep_test(sc, SE_FL_EOS | SE_FL_ERROR)) + goto end_recv; + + if (sc_ep_test(sc, SE_FL_WANT_ROOM)) + sc_need_room(sc, -1); + + if (sc_ep_test(sc, SE_FL_MAY_FASTFWD_PROD) && ic->to_forward) + goto done_recv; + } + + abort_fastfwd: + if (!sc_alloc_ibuf(sc, &appctx->buffer_wait)) + goto end_recv; + + /* For an HTX stream, if the buffer is stuck (no output data with some + * input data) and if the HTX message is fragmented or if its free space + * wraps, we force an HTX deframentation. It is a way to have a + * contiguous free space nad to let the mux to copy as much data as + * possible. + * + * NOTE: A possible optim may be to let the mux decides if defrag is + * required or not, depending on amount of data to be xferred. + */ + if (IS_HTX_STRM(__sc_strm(sc)) && !co_data(ic)) { + struct htx *htx = htxbuf(&ic->buf); + + if (htx_is_not_empty(htx) && ((htx->flags & HTX_FL_FRAGMENTED) || htx_space_wraps(htx))) + htx_defrag(htx, NULL, 0); + } + + /* Compute transient CO_RFL_* flags */ + if (co_data(ic)) { + flags |= (CO_RFL_BUF_WET | CO_RFL_BUF_NOT_STUCK); + } + + /* may be null. This is the mux responsibility to set + * SE_FL_RCV_MORE on the SC if more space is needed. + */ + max = channel_recv_max(ic); + ret = appctx_rcv_buf(sc, &ic->buf, max, flags); + if (sc_ep_test(sc, SE_FL_WANT_ROOM)) { + /* SE_FL_WANT_ROOM must not be reported if the channel's + * buffer is empty. + */ + BUG_ON(c_empty(ic)); + + sc_need_room(sc, channel_recv_max(ic) + 1); + /* Add READ_PARTIAL because some data are pending but + * cannot be xferred to the channel + */ + ic->flags |= CF_READ_EVENT; + sc_ep_report_read_activity(sc); + } + + if (ret <= 0) { + /* if we refrained from reading because we asked for a flush to + * satisfy rcv_pipe(), report that there's not enough room here + * to proceed. + */ + if (flags & CO_RFL_BUF_FLUSH) + sc_need_room(sc, -1); + goto done_recv; + } + + cur_read += ret; + + /* if we're allowed to directly forward data, we must update ->o */ + if (ic->to_forward && !(sc_opposite(sc)->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED))) { + unsigned long fwd = ret; + if (ic->to_forward != CHN_INFINITE_FORWARD) { + if (fwd > ic->to_forward) + fwd = ic->to_forward; + ic->to_forward -= fwd; + } + c_adv(ic, fwd); + } + + ic->flags |= CF_READ_EVENT; + ic->total += ret; + + /* End-of-input reached, we can leave. In this case, it is + * important to break the loop to not block the SC because of + * the channel's policies.This way, we are still able to receive + * shutdowns. + */ + if (sc_ep_test(sc, SE_FL_EOI)) + goto done_recv; + + if ((sc->flags & SC_FL_RCV_ONCE) || --read_poll <= 0) { + /* we don't expect to read more data */ + sc_wont_read(sc); + goto done_recv; + } + + /* if too many bytes were missing from last read, it means that + * it's pointless trying to read again because the system does + * not have them in buffers. + */ + if (ret < max) { + /* if a streamer has read few data, it may be because we + * have exhausted system buffers. It's not worth trying + * again. + */ + if (ic->flags & CF_STREAMER) { + /* we're stopped by the channel's policy */ + sc_wont_read(sc); + goto done_recv; + } + + /* if we read a large block smaller than what we requested, + * it's almost certain we'll never get anything more. + */ + if (ret >= global.tune.recv_enough) { + /* we're stopped by the channel's policy */ + sc_wont_read(sc); + } + } + + done_recv: + if (cur_read) { + channel_check_xfer(ic, cur_read); + sc_ep_report_read_activity(sc); + } + + end_recv: + ret = (cur_read != 0); + + /* Report EOI on the channel if it was reached from the mux point of + * view. */ + if (sc_ep_test(sc, SE_FL_EOI) && !(sc->flags & SC_FL_EOI)) { + sc_ep_report_read_activity(sc); + sc->flags |= SC_FL_EOI; + ic->flags |= CF_READ_EVENT; + ret = 1; + } + + if (sc_ep_test(sc, SE_FL_EOS)) { + /* we received a shutdown */ + if (ic->flags & CF_AUTO_CLOSE) + sc_schedule_shutdown(sc_opposite(sc)); + sc_applet_eos(sc); + ret = 1; + } + + if (sc_ep_test(sc, SE_FL_ERROR)) { + sc->flags |= SC_FL_ERROR; + ret = 1; + } + else if (cur_read || (sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM))) { + se_have_more_data(sc->sedesc); + ret = 1; + } + + return ret; +} + +/* This tries to perform a synchronous receive on the stream connector to + * try to collect last arrived data. In practice it's only implemented on + * stconns. Returns 0 if nothing was done, non-zero if new data or a + * shutdown were collected. This may result on some delayed receive calls + * to be programmed and performed later, though it doesn't provide any + * such guarantee. + */ +int sc_applet_sync_recv(struct stconn *sc) +{ + if (!(__sc_appctx(sc)->flags & APPCTX_FL_INOUT_BUFS)) + return 0; + + if (!sc_state_in(sc->state, SC_SB_RDY|SC_SB_EST)) + return 0; + + if (se_fl_test(sc->sedesc, SE_FL_ORPHAN)) + return 0; + + if (!sc_is_recv_allowed(sc)) + return 0; // already failed + + return sc_applet_recv(sc); +} + +/* + * This function is called to send buffer data to an applet. It calls the + * applet's snd_buf function. Please do not statify this function, it's often + * present in backtraces, it's useful to recognize it. + */ +int sc_applet_send(struct stconn *sc) +{ + struct stconn *sco = sc_opposite(sc); + struct channel *oc = sc_oc(sc); + size_t ret; + int did_send = 0; + + if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) { + BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING)); + return 1; + } + + if (sc_ep_test(sc, SE_FL_WONT_CONSUME)) + return 0; + + /* we might have been called just after an asynchronous shutw */ + if (sc->flags & SC_FL_SHUT_DONE) + return 1; + + /* We must wait because the applet is not fully initialized */ + if (se_fl_test(sc->sedesc, SE_FL_ORPHAN)) + return 0; + + /* TODO: Splicing is not supported, so it is not possible to have FF data stuck into the I/O buf */ + BUG_ON(sc_ep_have_ff_data(sc)); + + if (co_data(oc)) { + unsigned int send_flag = 0; + + if ((sc->flags & SC_FL_SHUT_WANTED) && co_data(oc) == c_data(oc)) + send_flag |= CO_SFL_LAST_DATA; + + ret = appctx_snd_buf(sc, &oc->buf, co_data(oc), send_flag); + if (ret > 0) { + did_send = 1; + c_rew(oc, ret); + c_realign_if_empty(oc); + + if (!co_data(oc)) { + /* Always clear both flags once everything has been sent, they're one-shot */ + sc->flags &= ~(SC_FL_SND_ASAP|SC_FL_SND_EXP_MORE); + } + /* if some data remain in the buffer, it's only because the + * system buffers are full, we will try next time. + */ + } + } + + if (did_send) + oc->flags |= CF_WRITE_EVENT | CF_WROTE_DATA; + + if (!sco->room_needed || (did_send && (sco->room_needed < 0 || channel_recv_max(sc_oc(sc)) >= sco->room_needed))) + sc_have_room(sco); + + if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) { + oc->flags |= CF_WRITE_EVENT; + BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING)); + if (sc_ep_test(sc, SE_FL_ERROR)) + sc->flags |= SC_FL_ERROR; + return 1; + } + + if (!co_data(oc)) { + if (did_send) + sc_ep_report_send_activity(sc); + } + else { + sc_ep_report_blocked_send(sc, did_send); + } + + return did_send; +} + +void sc_applet_sync_send(struct stconn *sc) +{ + struct channel *oc = sc_oc(sc); + + oc->flags &= ~CF_WRITE_EVENT; + + if (!(__sc_appctx(sc)->flags & APPCTX_FL_INOUT_BUFS)) + return; + + if (sc->flags & SC_FL_SHUT_DONE) + return; + + if (!co_data(oc)) + return; + + if (!sc_state_in(sc->state, SC_SB_EST)) + return; + + if (se_fl_test(sc->sedesc, SE_FL_ORPHAN)) + return; + + sc_applet_send(sc); +} + /* Callback to be used by applet handlers upon completion. It updates the stream * (which may or may not take this opportunity to try to forward data), then * may re-enable the applet's based on the channels and stream connector's final @@ -1960,7 +2324,8 @@ int sc_applet_process(struct stconn *sc) * appctx but in the case the task is not in runqueue we may have to * wakeup the appctx immediately. */ - if (sc_is_recv_allowed(sc) || sc_is_send_allowed(sc)) + if ((sc_is_recv_allowed(sc) && !applet_fl_test(__sc_appctx(sc), APPCTX_FL_OUTBLK_ALLOC)) || + (sc_is_send_allowed(sc) && !applet_fl_test(__sc_appctx(sc), APPCTX_FL_INBLK_ALLOC))) appctx_wakeup(__sc_appctx(sc)); return 0; } @@ -2036,6 +2401,57 @@ smp_fetch_sid(const struct arg *args, struct sample *smp, const char *kw, void * return 1; } +/* return 1 if the frontend or backend mux stream has received an abort and 0 otherwise. + */ +static int +smp_fetch_strm_aborted(const struct arg *args, struct sample *smp, const char *kw, void *private) +{ + struct stconn *sc; + unsigned int aborted = 0; + + if (!smp->strm) + return 0; + + sc = (kw[0] == 'f' ? smp->strm->scf : smp->strm->scb); + if (sc->sedesc->abort_info.info) + aborted = 1; + + smp->flags = SMP_F_VOL_TXN; + smp->data.type = SMP_T_BOOL; + smp->data.u.sint = aborted; + + return 1; +} + +/* return the H2/QUIC RESET code of the frontend or backend mux stream. Any value + * means an a RST_STREAM was received on H2 and a STOP_SENDING on QUIC. Otherwise the sample fetch fails. + */ +static int +smp_fetch_strm_rst_code(const struct arg *args, struct sample *smp, const char *kw, void *private) +{ + struct stconn *sc; + unsigned int source; + unsigned long long code = 0; + + if (!smp->strm) + return 0; + + sc = (kw[0] == 'f' ? smp->strm->scf : smp->strm->scb); + source = ((sc->sedesc->abort_info.info & SE_ABRT_SRC_MASK) >> SE_ABRT_SRC_SHIFT); + if (source != SE_ABRT_SRC_MUX_H2 && source != SE_ABRT_SRC_MUX_QUIC) { + if (!source) + smp->flags |= SMP_F_MAY_CHANGE; + return 0; + } + code = sc->sedesc->abort_info.code; + + smp->flags = SMP_F_VOL_TXN; + smp->data.type = SMP_T_SINT; + smp->data.u.sint = code; + + return 1; +} + /* Note: must not be declared as its list will be overwritten. * Note: fetches that may return multiple types should be declared using the * appropriate pseudo-type. If not available it must be declared as the lowest @@ -2043,7 +2459,11 @@ smp_fetch_sid(const struct arg *args, struct sample *smp, const char *kw, void * */ static struct sample_fetch_kw_list sample_fetch_keywords = {ILH, { { "bs.id", smp_fetch_sid, 0, NULL, SMP_T_SINT, SMP_USE_L6REQ }, + { "bs.aborted", smp_fetch_strm_aborted, 0, NULL, SMP_T_SINT, SMP_USE_L5SRV }, + { "bs.rst_code", smp_fetch_strm_rst_code, 0, NULL, SMP_T_SINT, SMP_USE_L5SRV }, { "fs.id", smp_fetch_sid, 0, NULL, SMP_T_STR, SMP_USE_L6RES }, + { "fs.aborted", smp_fetch_strm_aborted, 0, NULL, SMP_T_SINT, SMP_USE_L5CLI }, + { "fs.rst_code", smp_fetch_strm_rst_code, 0, NULL, SMP_T_SINT, SMP_USE_L5CLI }, { /* END */ }, }}; -- cgit v1.2.3