summaryrefslogtreecommitdiffstats
path: root/src/mux_quic.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/mux_quic.c')
-rw-r--r--src/mux_quic.c853
1 files changed, 530 insertions, 323 deletions
diff --git a/src/mux_quic.c b/src/mux_quic.c
index 05c92fa..ae504ee 100644
--- a/src/mux_quic.c
+++ b/src/mux_quic.c
@@ -3,6 +3,7 @@
#include <import/eb64tree.h>
#include <haproxy/api.h>
+#include <haproxy/chunk.h>
#include <haproxy/connection.h>
#include <haproxy/dynbuf.h>
#include <haproxy/h3.h>
@@ -13,6 +14,7 @@
#include <haproxy/qmux_http.h>
#include <haproxy/qmux_trace.h>
#include <haproxy/quic_conn.h>
+#include <haproxy/quic_fctl.h>
#include <haproxy/quic_frame.h>
#include <haproxy/quic_sock.h>
#include <haproxy/quic_stream.h>
@@ -58,6 +60,8 @@ static void qcs_free(struct qcs *qcs)
/* Safe to use even if already removed from the list. */
LIST_DEL_INIT(&qcs->el_opening);
LIST_DEL_INIT(&qcs->el_send);
+ LIST_DEL_INIT(&qcs->el_fctl);
+ LIST_DEL_INIT(&qcs->el_buf);
/* Release stream endpoint descriptor. */
BUG_ON(qcs->sd && !se_fl_test(qcs->sd, SE_FL_ORPHAN));
@@ -68,11 +72,10 @@ static void qcs_free(struct qcs *qcs)
qcc->app_ops->detach(qcs);
/* Release qc_stream_desc buffer from quic-conn layer. */
- qc_stream_desc_release(qcs->stream, qcs->tx.sent_offset);
+ qc_stream_desc_release(qcs->stream, qcs->tx.fc.off_real);
- /* Free Rx/Tx buffers. */
+ /* Free Rx buffer. */
qcs_free_ncbuf(qcs, &qcs->rx.ncbuf);
- b_free(&qcs->tx.buf);
/* Remove qcs from qcc tree. */
eb64_delete(&qcs->by_id);
@@ -97,34 +100,45 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
qcs->stream = NULL;
qcs->qcc = qcc;
- qcs->sd = NULL;
qcs->flags = QC_SF_NONE;
qcs->st = QC_SS_IDLE;
qcs->ctx = NULL;
+ qcs->sd = sedesc_new();
+ if (!qcs->sd)
+ goto err;
+ qcs->sd->se = qcs;
+ qcs->sd->conn = qcc->conn;
+ se_fl_set(qcs->sd, SE_FL_T_MUX | SE_FL_ORPHAN | SE_FL_NOT_FIRST);
+ se_expect_no_data(qcs->sd);
+
+ if (!(global.tune.no_zero_copy_fwd & NO_ZERO_COPY_FWD_QUIC_SND))
+ se_fl_set(qcs->sd, SE_FL_MAY_FASTFWD_CONS);
+
/* App callback attach may register the stream for http-request wait.
* These fields must be initialed before.
*/
LIST_INIT(&qcs->el_opening);
LIST_INIT(&qcs->el_send);
+ LIST_INIT(&qcs->el_fctl);
+ LIST_INIT(&qcs->el_buf);
qcs->start = TICK_ETERNITY;
/* store transport layer stream descriptor in qcc tree */
qcs->id = qcs->by_id.key = id;
eb64_insert(&qcc->streams_by_id, &qcs->by_id);
- /* If stream is local, use peer remote-limit, or else the opposite. */
+ /* Different limits can be set by the peer for local and remote bidi streams. */
if (quic_stream_is_bidi(id)) {
- qcs->tx.msd = quic_stream_is_local(qcc, id) ? qcc->rfctl.msd_bidi_r :
- qcc->rfctl.msd_bidi_l;
+ qfctl_init(&qcs->tx.fc, quic_stream_is_local(qcc, id) ?
+ qcc->rfctl.msd_bidi_r : qcc->rfctl.msd_bidi_l);
}
else if (quic_stream_is_local(qcc, id)) {
- qcs->tx.msd = qcc->rfctl.msd_uni_l;
+ qfctl_init(&qcs->tx.fc, qcc->rfctl.msd_uni_l);
+ }
+ else {
+ qfctl_init(&qcs->tx.fc, 0);
}
-
- /* Properly set flow-control blocking if initial MSD is nul. */
- if (!qcs->tx.msd)
- qcs->flags |= QC_SF_BLK_SFCTL;
qcs->rx.ncbuf = NCBUF_NULL;
qcs->rx.app_buf = BUF_NULL;
@@ -139,10 +153,6 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
}
qcs->rx.msd_init = qcs->rx.msd;
- qcs->tx.buf = BUF_NULL;
- qcs->tx.offset = 0;
- qcs->tx.sent_offset = 0;
-
qcs->wait_event.tasklet = NULL;
qcs->wait_event.events = 0;
qcs->subs = NULL;
@@ -423,15 +433,6 @@ int qcs_is_close_remote(struct qcs *qcs)
return qcs->st == QC_SS_HREM || qcs->st == QC_SS_CLO;
}
-/* Allocate if needed buffer <bptr> for stream <qcs>.
- *
- * Returns the buffer instance or NULL on allocation failure.
- */
-struct buffer *qcs_get_buf(struct qcs *qcs, struct buffer *bptr)
-{
- return b_alloc(bptr);
-}
-
/* Allocate if needed buffer <ncbuf> for stream <qcs>.
*
* Returns the buffer instance or NULL on allocation failure.
@@ -441,7 +442,7 @@ static struct ncbuf *qcs_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
struct buffer buf = BUF_NULL;
if (ncb_is_null(ncbuf)) {
- if (!b_alloc(&buf))
+ if (!b_alloc(&buf, DB_MUX_RX))
return NULL;
*ncbuf = ncb_make(buf.area, buf.size, 0);
@@ -511,6 +512,35 @@ void qcs_notify_send(struct qcs *qcs)
}
}
+/* Notify on a new stream-desc buffer available for <qcc> connection.
+ *
+ * Returns true if a stream was woken up. If false is returned, this indicates
+ * to the caller that it's currently unnecessary to notify for the rest of the
+ * available buffers.
+ */
+int qcc_notify_buf(struct qcc *qcc)
+{
+ struct qcs *qcs;
+ int ret = 0;
+
+ TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
+
+ if (qcc->flags & QC_CF_CONN_FULL) {
+ TRACE_STATE("new stream desc buffer available", QMUX_EV_QCC_WAKE, qcc->conn);
+ qcc->flags &= ~QC_CF_CONN_FULL;
+ }
+
+ if (!LIST_ISEMPTY(&qcc->buf_wait_list)) {
+ qcs = LIST_ELEM(qcc->buf_wait_list.n, struct qcs *, el_buf);
+ LIST_DEL_INIT(&qcs->el_buf);
+ qcs_notify_send(qcs);
+ ret = 1;
+ }
+
+ TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
+ return ret;
+}
+
/* A fatal error is detected locally for <qcc> connection. It should be closed
* with a CONNECTION_CLOSE using <err> code. Set <app> to true to indicate that
* the code must be considered as an application level error. This function
@@ -536,6 +566,28 @@ void qcc_set_error(struct qcc *qcc, int err, int app)
tasklet_wakeup(qcc->wait_event.tasklet);
}
+/* Increment glitch counter for <qcc> connection by <inc> steps. If configured
+ * threshold reached, close the connection with an error code.
+ */
+int qcc_report_glitch(struct qcc *qcc, int inc)
+{
+ const int max = global.tune.quic_frontend_glitches_threshold;
+
+ qcc->glitches += inc;
+ if (max && qcc->glitches >= max && !(qcc->flags & QC_CF_ERRL)) {
+ if (qcc->app_ops->report_susp) {
+ qcc->app_ops->report_susp(qcc->ctx);
+ qcc_set_error(qcc, qcc->err.code, 1);
+ }
+ else {
+ qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0);
+ }
+ return 1;
+ }
+
+ return 0;
+}
+
/* Open a locally initiated stream for the connection <qcc>. Set <bidi> for a
* bidirectional stream, else an unidirectional stream is opened. The next
* available ID on the connection will be used according to the stream type.
@@ -650,17 +702,6 @@ struct stconn *qcs_attach_sc(struct qcs *qcs, struct buffer *buf, char fin)
struct qcc *qcc = qcs->qcc;
struct session *sess = qcc->conn->owner;
- qcs->sd = sedesc_new();
- if (!qcs->sd)
- return NULL;
-
- qcs->sd->se = qcs;
- qcs->sd->conn = qcc->conn;
- se_fl_set(qcs->sd, SE_FL_T_MUX | SE_FL_ORPHAN | SE_FL_NOT_FIRST);
- se_expect_no_data(qcs->sd);
-
- if (!(global.tune.no_zero_copy_fwd & NO_ZERO_COPY_FWD_QUIC_SND))
- se_fl_set(qcs->sd, SE_FL_MAY_FASTFWD_CONS);
/* TODO duplicated from mux_h2 */
sess->t_idle = ns_to_ms(now_ns - sess->accept_ts) - sess->t_handshake;
@@ -899,7 +940,7 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
fin = 1;
if (!(qcs->flags & QC_SF_READ_ABORTED)) {
- ret = qcc->app_ops->decode_qcs(qcs, &b, fin);
+ ret = qcc->app_ops->rcv_buf(qcs, &b, fin);
if (ret < 0) {
TRACE_ERROR("decoding error", QMUX_EV_QCS_RECV, qcc->conn, qcs);
goto err;
@@ -930,25 +971,170 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
return 1;
}
+/* Allocate if needed and retrieve <qcs> stream buffer for data reception.
+ *
+ * Returns buffer pointer. May be NULL on allocation failure.
+ */
+struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs)
+{
+ return b_alloc(&qcs->rx.app_buf, DB_MUX_RX);
+}
+
+/* Allocate if needed and retrieve <qcs> stream buffer for data emission.
+ *
+ * <err> is an output argument which is useful to differentiate the failure
+ * cause when the buffer cannot be allocated. It is set to 0 if the connection
+ * buffer limit is reached. For fatal errors, its value is non-zero.
+ *
+ * Returns buffer pointer. May be NULL on allocation failure, in which case
+ * <err> will refer to the cause.
+ */
+struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err)
+{
+ struct qcc *qcc = qcs->qcc;
+ int buf_avail;
+ struct buffer *out = qc_stream_buf_get(qcs->stream);
+
+ /* Stream must not try to reallocate a buffer if currently waiting for one. */
+ BUG_ON(LIST_INLIST(&qcs->el_buf));
+
+ *err = 0;
+
+ if (!out) {
+ if (qcc->flags & QC_CF_CONN_FULL) {
+ LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf);
+ goto out;
+ }
+
+ out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real,
+ &buf_avail);
+ if (!out) {
+ if (buf_avail) {
+ TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+ *err = 1;
+ goto out;
+ }
+
+ TRACE_STATE("hitting stream desc buffer limit", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+ LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf);
+ qcc->flags |= QC_CF_CONN_FULL;
+ goto out;
+ }
+
+ if (!b_alloc(out, DB_MUX_TX)) {
+ TRACE_ERROR("buffer alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+ *err = 1;
+ goto out;
+ }
+ }
+
+ out:
+ return out;
+}
+
+/* Returns total number of bytes not already sent to quic-conn layer. */
+static uint64_t qcs_prep_bytes(const struct qcs *qcs)
+{
+ struct buffer *out = qc_stream_buf_get(qcs->stream);
+ uint64_t diff, base_off;
+
+ if (!out)
+ return 0;
+
+ /* if ack_offset < buf_offset, it points to an older buffer. */
+ base_off = MAX(qcs->stream->buf_offset, qcs->stream->ack_offset);
+ diff = qcs->tx.fc.off_real - base_off;
+ return b_data(out) - diff;
+}
+
+/* Try to realign <out> buffer for <qcs> stream. This is done only if there is
+ * no data waiting for ACK.
+ *
+ * Returns 0 if realign was performed else non-zero.
+ */
+int qcc_realign_stream_txbuf(const struct qcs *qcs, struct buffer *out)
+{
+ if (qcs_prep_bytes(qcs) == b_data(out)) {
+ b_slow_realign(out, trash.area, b_data(out));
+ return 0;
+ }
+
+ return 1;
+}
+
+/* Release the current <qcs> Tx buffer. This is useful if space left is not
+ * enough anymore. A new instance can then be allocated to continue sending.
+ *
+ * This operation fails if there is not yet sent bytes in the buffer. In this
+ * case, stream layer should interrupt sending until further notification.
+ *
+ * Returns 0 if buffer is released and a new one can be allocated or non-zero
+ * if there is still remaining data.
+ */
+int qcc_release_stream_txbuf(struct qcs *qcs)
+{
+ const uint64_t bytes = qcs_prep_bytes(qcs);
+
+ /* Cannot release buffer if prepared data is not fully sent. */
+ if (bytes) {
+ qcs->flags |= QC_SF_BLK_MROOM;
+ return 1;
+ }
+
+ qc_stream_buf_release(qcs->stream);
+ return 0;
+}
+
+/* Returns true if stream layer can proceed to emission via <qcs>. */
+int qcc_stream_can_send(const struct qcs *qcs)
+{
+ return !(qcs->flags & QC_SF_BLK_MROOM) && !LIST_INLIST(&qcs->el_buf);
+}
+
+/* Wakes up every streams of <qcc> which are currently waiting for sending but
+ * are blocked on connection flow control.
+ */
+static void qcc_notify_fctl(struct qcc *qcc)
+{
+ struct qcs *qcs;
+
+ while (!LIST_ISEMPTY(&qcc->fctl_list)) {
+ qcs = LIST_ELEM(qcc->fctl_list.n, struct qcs *, el_fctl);
+ LIST_DEL_INIT(&qcs->el_fctl);
+ qcs_notify_send(qcs);
+ }
+}
+
/* Prepare for the emission of RESET_STREAM on <qcs> with error code <err>. */
void qcc_reset_stream(struct qcs *qcs, int err)
{
struct qcc *qcc = qcs->qcc;
+ const uint64_t diff = qcs_prep_bytes(qcs);
if ((qcs->flags & QC_SF_TO_RESET) || qcs_is_close_local(qcs))
return;
+ /* TODO if QCS waiting for buffer, it could be removed from
+ * <qcc.buf_wait_list> if sending is closed now.
+ */
+
TRACE_STATE("reset stream", QMUX_EV_QCS_END, qcc->conn, qcs);
qcs->flags |= QC_SF_TO_RESET;
qcs->err = err;
- /* Remove prepared stream data from connection flow-control calcul. */
- if (qcs->tx.offset > qcs->tx.sent_offset) {
- const uint64_t diff = qcs->tx.offset - qcs->tx.sent_offset;
- BUG_ON(qcc->tx.offsets - diff < qcc->tx.sent_offsets);
- qcc->tx.offsets -= diff;
- /* Reset qcs offset to prevent BUG_ON() on qcs_destroy(). */
- qcs->tx.offset = qcs->tx.sent_offset;
+ if (diff) {
+ const int soft_blocked = qfctl_sblocked(&qcc->tx.fc);
+
+ /* Soft offset cannot be inferior to real one. */
+ BUG_ON(qcc->tx.fc.off_soft - diff < qcc->tx.fc.off_real);
+
+ /* Subtract to conn flow control data amount prepared on stream not yet sent. */
+ qcc->tx.fc.off_soft -= diff;
+ if (soft_blocked && !qfctl_sblocked(&qcc->tx.fc))
+ qcc_notify_fctl(qcc);
+
+ /* Reset QCS soft off to prevent BUG_ON() on qcs_destroy(). */
+ qcs->tx.fc.off_soft = qcs->tx.fc.off_real;
}
/* Report send error to stream-endpoint layer. */
@@ -957,15 +1143,16 @@ void qcc_reset_stream(struct qcs *qcs, int err)
qcs_alert(qcs);
}
- qcc_send_stream(qcs, 1);
+ qcc_send_stream(qcs, 1, 0);
tasklet_wakeup(qcc->wait_event.tasklet);
}
/* Register <qcs> stream for emission of STREAM, STOP_SENDING or RESET_STREAM.
* Set <urg> to 1 if stream content should be treated in priority compared to
- * other streams.
+ * other streams. For STREAM emission, <count> must contains the size of the
+ * frame payload. This is used for flow control accounting.
*/
-void qcc_send_stream(struct qcs *qcs, int urg)
+void qcc_send_stream(struct qcs *qcs, int urg, int count)
{
struct qcc *qcc = qcs->qcc;
@@ -983,6 +1170,11 @@ void qcc_send_stream(struct qcs *qcs, int urg)
LIST_APPEND(&qcs->qcc->send_list, &qcs->el_send);
}
+ if (count) {
+ qfctl_sinc(&qcc->tx.fc, count);
+ qfctl_sinc(&qcs->tx.fc, count);
+ }
+
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
}
@@ -999,7 +1191,7 @@ void qcc_abort_stream_read(struct qcs *qcs)
TRACE_STATE("abort stream read", QMUX_EV_QCS_END, qcc->conn, qcs);
qcs->flags |= (QC_SF_TO_STOP_SENDING|QC_SF_READ_ABORTED);
- qcc_send_stream(qcs, 1);
+ qcc_send_stream(qcs, 1, 0);
tasklet_wakeup(qcc->wait_event.tasklet);
end:
@@ -1203,17 +1395,19 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
*/
int qcc_recv_max_data(struct qcc *qcc, uint64_t max)
{
+ int unblock_soft = 0, unblock_real = 0;
+
TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
TRACE_PROTO("receiving MAX_DATA", QMUX_EV_QCC_RECV, qcc->conn);
- if (qcc->rfctl.md < max) {
- qcc->rfctl.md = max;
+ if (qfctl_set_max(&qcc->tx.fc, max, &unblock_soft, &unblock_real)) {
TRACE_DATA("increase remote max-data", QMUX_EV_QCC_RECV, qcc->conn);
- if (qcc->flags & QC_CF_BLK_MFCTL) {
- qcc->flags &= ~QC_CF_BLK_MFCTL;
+ if (unblock_real)
tasklet_wakeup(qcc->wait_event.tasklet);
- }
+
+ if (unblock_soft)
+ qcc_notify_fctl(qcc);
}
TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
@@ -1249,16 +1443,18 @@ int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max)
goto err;
if (qcs) {
+ int unblock_soft = 0, unblock_real = 0;
+
TRACE_PROTO("receiving MAX_STREAM_DATA", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
- if (max > qcs->tx.msd) {
- qcs->tx.msd = max;
+ if (qfctl_set_max(&qcs->tx.fc, max, &unblock_soft, &unblock_real)) {
TRACE_DATA("increase remote max-stream-data", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
-
- if (qcs->flags & QC_SF_BLK_SFCTL) {
- qcs->flags &= ~QC_SF_BLK_SFCTL;
+ if (unblock_real) {
/* TODO optim: only wakeup IO-CB if stream has data to sent. */
tasklet_wakeup(qcc->wait_event.tasklet);
}
+
+ if (unblock_soft)
+ qcs_notify_send(qcs);
}
}
@@ -1410,14 +1606,18 @@ int qcc_recv_stop_sending(struct qcc *qcc, uint64_t id, uint64_t err)
}
}
- /* If FIN already reached, future RESET_STREAMS will be ignored.
- * Manually set EOS in this case.
- */
+ /* Manually set EOS if FIN already reached as futures RESET_STREAM will be ignored in this case. */
if (qcs_sc(qcs) && se_fl_test(qcs->sd, SE_FL_EOI)) {
se_fl_set(qcs->sd, SE_FL_EOS);
qcs_alert(qcs);
}
+ /* If not defined yet, set abort info for the sedesc */
+ if (!qcs->sd->abort_info.info) {
+ qcs->sd->abort_info.info = (SE_ABRT_SRC_MUX_QUIC << SE_ABRT_SRC_SHIFT);
+ qcs->sd->abort_info.code = err;
+ }
+
/* RFC 9000 3.5. Solicited State Transitions
*
* An endpoint that receives a STOP_SENDING frame
@@ -1500,12 +1700,12 @@ static void qcs_destroy(struct qcs *qcs)
TRACE_ENTER(QMUX_EV_QCS_END, conn, qcs);
- /* MUST not removed a stream with sending prepared data left. This is
- * to ensure consistency on connection flow-control calculation.
- */
- BUG_ON(qcs->tx.offset < qcs->tx.sent_offset);
+ if (!(qcc->flags & (QC_CF_ERR_CONN|QC_CF_ERRL))) {
+ /* MUST not removed a stream with sending prepared data left. This is
+ * to ensure consistency on connection flow-control calculation.
+ */
+ BUG_ON(qcs->tx.fc.off_soft != qcs->tx.fc.off_real);
- if (!(qcc->flags & QC_CF_ERRL)) {
if (quic_stream_is_remote(qcc, id))
qcc_release_remote_stream(qcc, id);
}
@@ -1515,114 +1715,52 @@ static void qcs_destroy(struct qcs *qcs)
TRACE_LEAVE(QMUX_EV_QCS_END, conn);
}
-/* Transfer as much as possible data on <qcs> from <in> to <out>. This is done
- * in respect with available flow-control at stream and connection level.
+/* Prepare a STREAM frame for <qcs> instance using <out> as payload. The frame
+ * is appended in <frm_list>. Set <fin> if this is supposed to be the last
+ * stream frame. If <out> is NULL an empty STREAM frame is built : this may be
+ * useful if FIN needs to be sent without any data left. Frame length will be
+ * truncated if greater than <fc_conn_wnd>. This allows to prepare several
+ * frames in a loop while respecting connection flow control window.
*
- * Returns the total bytes of transferred data or a negative error code.
+ * Returns the payload length of the STREAM frame or a negative error code.
*/
-static int qcs_xfer_data(struct qcs *qcs, struct buffer *out, struct buffer *in)
+static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin,
+ struct list *frm_list, uint64_t window_conn)
{
struct qcc *qcc = qcs->qcc;
- int left, to_xfer;
- int total = 0;
+ struct quic_frame *frm;
+ const uint64_t window_stream = qfctl_rcap(&qcs->tx.fc);
+ const uint64_t bytes = qcs_prep_bytes(qcs);
+ uint64_t total;
TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
- if (!qcs_get_buf(qcs, out)) {
- TRACE_ERROR("buffer alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
- goto err;
- }
-
- /*
- * QCS out buffer diagram
- * head left to_xfer
- * -------------> ----------> ----->
- * --------------------------------------------------
- * |...............|xxxxxxxxxxx|<<<<<
- * --------------------------------------------------
- * ^ ack-off ^ sent-off ^ off
- *
- * STREAM frame
- * ^ ^
- * |xxxxxxxxxxxxxxxxx|
- */
-
- BUG_ON_HOT(qcs->tx.sent_offset < qcs->stream->ack_offset);
- BUG_ON_HOT(qcs->tx.offset < qcs->tx.sent_offset);
- BUG_ON_HOT(qcc->tx.offsets < qcc->tx.sent_offsets);
+ /* This must only be called if there is data left, or at least a standalone FIN. */
+ BUG_ON((!out || !b_data(out)) && !fin);
- left = qcs->tx.offset - qcs->tx.sent_offset;
- to_xfer = QUIC_MIN(b_data(in), b_room(out));
+ total = bytes;
- BUG_ON_HOT(qcs->tx.offset > qcs->tx.msd);
- /* do not exceed flow control limit */
- if (qcs->tx.offset + to_xfer > qcs->tx.msd) {
+ /* do not exceed stream flow control limit */
+ if (total > window_stream) {
TRACE_DATA("do not exceed stream flow control", QMUX_EV_QCS_SEND, qcc->conn, qcs);
- to_xfer = qcs->tx.msd - qcs->tx.offset;
+ total = window_stream;
}
- BUG_ON_HOT(qcc->tx.offsets > qcc->rfctl.md);
- /* do not overcome flow control limit on connection */
- if (qcc->tx.offsets + to_xfer > qcc->rfctl.md) {
+ /* do not exceed connection flow control limit */
+ if (total > window_conn) {
TRACE_DATA("do not exceed conn flow control", QMUX_EV_QCS_SEND, qcc->conn, qcs);
- to_xfer = qcc->rfctl.md - qcc->tx.offsets;
+ total = window_conn;
}
- if (!left && !to_xfer)
- goto out;
-
- total = b_force_xfer(out, in, to_xfer);
-
- out:
- {
- struct qcs_xfer_data_trace_arg arg = {
- .prep = b_data(out), .xfer = total,
- };
- TRACE_LEAVE(QMUX_EV_QCS_SEND|QMUX_EV_QCS_XFER_DATA,
- qcc->conn, qcs, &arg);
- }
-
- return total;
-
- err:
- TRACE_DEVEL("leaving on error", QMUX_EV_QCS_SEND, qcc->conn, qcs);
- return -1;
-}
-
-/* Prepare a STREAM frame for <qcs> instance using <out> as payload. The frame
- * is appended in <frm_list>. Set <fin> if this is supposed to be the last
- * stream frame. If <out> is NULL an empty STREAM frame is built : this may be
- * useful if FIN needs to be sent without any data left.
- *
- * Returns the payload length of the STREAM frame or a negative error code.
- */
-static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin,
- struct list *frm_list)
-{
- struct qcc *qcc = qcs->qcc;
- struct quic_frame *frm;
- int head, total;
- uint64_t base_off;
-
- TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
-
- /* if ack_offset < buf_offset, it points to an older buffer. */
- base_off = MAX(qcs->stream->buf_offset, qcs->stream->ack_offset);
- BUG_ON(qcs->tx.sent_offset < base_off);
-
- head = qcs->tx.sent_offset - base_off;
- total = out ? b_data(out) - head : 0;
- BUG_ON(total < 0);
+ /* Reset FIN if bytes to send is capped by flow control. */
+ if (total < bytes)
+ fin = 0;
if (!total && !fin) {
/* No need to send anything if total is NULL and no FIN to signal. */
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
return 0;
}
- BUG_ON((!total && qcs->tx.sent_offset > qcs->tx.offset) ||
- (total && qcs->tx.sent_offset >= qcs->tx.offset));
- BUG_ON(qcs->tx.sent_offset + total > qcs->tx.offset);
- BUG_ON(qcc->tx.sent_offsets + total > qcc->rfctl.md);
TRACE_PROTO("sending STREAM frame", QMUX_EV_QCS_SEND, qcc->conn, qcs);
frm = qc_frm_alloc(QUIC_FT_STREAM_8);
@@ -1638,7 +1776,7 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin,
if (total) {
frm->stream.buf = out;
- frm->stream.data = (unsigned char *)b_peek(out, head);
+ frm->stream.data = (unsigned char *)b_peek(out, b_data(out) - bytes);
}
else {
/* Empty STREAM frame. */
@@ -1650,9 +1788,9 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin,
if (fin)
frm->type |= QUIC_STREAM_FRAME_TYPE_FIN_BIT;
- if (qcs->tx.sent_offset) {
+ if (qcs->tx.fc.off_real) {
frm->type |= QUIC_STREAM_FRAME_TYPE_OFF_BIT;
- frm->stream.offset.key = qcs->tx.sent_offset;
+ frm->stream.offset.key = qcs->tx.fc.off_real;
}
/* Always set length bit as we do not know if there is remaining frames
@@ -1680,23 +1818,6 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin,
return -1;
}
-/* Check after transferring data from qcs.tx.buf if FIN must be set on the next
- * STREAM frame for <qcs>.
- *
- * Returns true if FIN must be set else false.
- */
-static int qcs_stream_fin(struct qcs *qcs)
-{
- return qcs->flags & QC_SF_FIN_STREAM && !b_data(&qcs->tx.buf);
-}
-
-/* Return true if <qcs> has data to send in new STREAM frames. */
-static forceinline int qcs_need_sending(struct qcs *qcs)
-{
- return b_data(&qcs->tx.buf) || qcs->tx.sent_offset < qcs->tx.offset ||
- qcs_stream_fin(qcs);
-}
-
/* This function must be called by the upper layer to inform about the sending
* of a STREAM frame for <qcs> instance. The frame is of <data> length and on
* <offset>.
@@ -1708,42 +1829,45 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
- BUG_ON(offset > qcs->tx.sent_offset);
- BUG_ON(offset + data > qcs->tx.offset);
+ /* Real off MUST always be the greatest offset sent. */
+ BUG_ON(offset > qcs->tx.fc.off_real);
/* check if the STREAM frame has already been notified. It can happen
* for retransmission.
*/
- if (offset + data < qcs->tx.sent_offset) {
+ if (offset + data < qcs->tx.fc.off_real) {
TRACE_DEVEL("offset already notified", QMUX_EV_QCS_SEND, qcc->conn, qcs);
goto out;
}
qcs_idle_open(qcs);
- diff = offset + data - qcs->tx.sent_offset;
+ diff = offset + data - qcs->tx.fc.off_real;
if (diff) {
+ struct quic_fctl *fc_conn = &qcc->tx.fc;
+ struct quic_fctl *fc_strm = &qcs->tx.fc;
+
+ /* Ensure real offset never exceeds soft value. */
+ BUG_ON(fc_conn->off_real + diff > fc_conn->off_soft);
+ BUG_ON(fc_strm->off_real + diff > fc_strm->off_soft);
+
/* increase offset sum on connection */
- qcc->tx.sent_offsets += diff;
- BUG_ON_HOT(qcc->tx.sent_offsets > qcc->rfctl.md);
- if (qcc->tx.sent_offsets == qcc->rfctl.md) {
- qcc->flags |= QC_CF_BLK_MFCTL;
- TRACE_STATE("connection flow-control reached", QMUX_EV_QCS_SEND, qcc->conn);
+ if (qfctl_rinc(fc_conn, diff)) {
+ TRACE_STATE("connection flow-control reached",
+ QMUX_EV_QCS_SEND, qcc->conn);
}
/* increase offset on stream */
- qcs->tx.sent_offset += diff;
- BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.msd);
- BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.offset);
- if (qcs->tx.sent_offset == qcs->tx.msd) {
- qcs->flags |= QC_SF_BLK_SFCTL;
- TRACE_STATE("stream flow-control reached", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+ if (qfctl_rinc(fc_strm, diff)) {
+ TRACE_STATE("stream flow-control reached",
+ QMUX_EV_QCS_SEND, qcc->conn, qcs);
}
-
- /* If qcs.stream.buf is full, release it to the lower layer. */
- if (qcs->tx.offset == qcs->tx.sent_offset &&
- b_full(&qcs->stream->buf->buf)) {
+ /* Release buffer if everything sent and buf is full or stream is waiting for room. */
+ if (!qcs_prep_bytes(qcs) &&
+ (b_full(&qcs->stream->buf->buf) || qcs->flags & QC_SF_BLK_MROOM)) {
qc_stream_buf_release(qcs->stream);
+ qcs->flags &= ~QC_SF_BLK_MROOM;
+ qcs_notify_send(qcs);
}
/* Add measurement for send rate. This is done at the MUX layer
@@ -1752,7 +1876,7 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
increment_send_rate(diff, 0);
}
- if (qcs->tx.offset == qcs->tx.sent_offset && !b_data(&qcs->tx.buf)) {
+ if (!qc_stream_buf_get(qcs->stream) || !qcs_prep_bytes(qcs)) {
/* Remove stream from send_list if all was sent. */
LIST_DEL_INIT(&qcs->el_send);
TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs);
@@ -1842,7 +1966,7 @@ static int qcs_send_reset(struct qcs *qcs)
frm->reset_stream.id = qcs->id;
frm->reset_stream.app_error_code = qcs->err;
- frm->reset_stream.final_size = qcs->tx.sent_offset;
+ frm->reset_stream.final_size = qcs->tx.fc.off_real;
LIST_APPEND(&frms, &frm->list);
if (qcc_send_frames(qcs->qcc, &frms)) {
@@ -1910,87 +2034,46 @@ static int qcs_send_stop_sending(struct qcs *qcs)
return 0;
}
-/* Used internally by qcc_io_send function. Proceed to send for <qcs>. This will
- * transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame
- * is then generated and inserted in <frms> list.
+/* Used internally by qcc_io_send function. Proceed to send for <qcs>. A STREAM
+ * frame is generated pointing to QCS stream descriptor content and inserted in
+ * <frms> list. Frame length will be truncated if greater than <window_conn>.
+ * This allows to prepare several frames in a loop while respecting connection
+ * flow control window.
*
- * Returns the total bytes transferred between qcs and quic_stream buffers. Can
- * be null if out buffer cannot be allocated. On error a negative error code is
- * used.
+ * Returns the payload length of the STREAM frame or a negative error code.
*/
-static int qcs_send(struct qcs *qcs, struct list *frms)
+static int qcs_send(struct qcs *qcs, struct list *frms, uint64_t window_conn)
{
struct qcc *qcc = qcs->qcc;
- struct buffer *buf = &qcs->tx.buf;
struct buffer *out = qc_stream_buf_get(qcs->stream);
- int xfer = 0, buf_avail;
- char fin = 0;
+ int flen = 0;
+ const char fin = qcs->flags & QC_SF_FIN_STREAM;
TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
/* Cannot send STREAM on remote unidirectional streams. */
BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_remote(qcc, qcs->id));
- if (b_data(buf)) {
- /* Allocate <out> buffer if not already done. */
- if (!out) {
- if (qcc->flags & QC_CF_CONN_FULL)
- goto out;
-
- out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset,
- &buf_avail);
- if (!out) {
- if (buf_avail) {
- TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
- goto err;
- }
-
- TRACE_STATE("hitting stream desc buffer limit", QMUX_EV_QCS_SEND, qcc->conn, qcs);
- qcc->flags |= QC_CF_CONN_FULL;
- goto out;
- }
- }
-
- /* Transfer data from <buf> to <out>. */
- xfer = qcs_xfer_data(qcs, out, buf);
- if (xfer < 0)
- goto err;
-
- if (xfer > 0) {
- qcs_notify_send(qcs);
- qcs->flags &= ~QC_SF_BLK_MROOM;
- }
+ /* This function must not be called if there is nothing to send. */
+ BUG_ON(!fin && !qcs_prep_bytes(qcs));
- qcs->tx.offset += xfer;
- BUG_ON_HOT(qcs->tx.offset > qcs->tx.msd);
- qcc->tx.offsets += xfer;
- BUG_ON_HOT(qcc->tx.offsets > qcc->rfctl.md);
-
- /* out buffer cannot be emptied if qcs offsets differ. */
- BUG_ON(!b_data(out) && qcs->tx.sent_offset != qcs->tx.offset);
+ /* Skip STREAM frame allocation if already subscribed for send.
+ * Happens on sendto transient error or network congestion.
+ */
+ if (qcc->wait_event.events & SUB_RETRY_SEND) {
+ TRACE_DEVEL("already subscribed for sending",
+ QMUX_EV_QCS_SEND, qcc->conn, qcs);
+ goto err;
}
- /* FIN is set if all incoming data were transferred. */
- fin = qcs_stream_fin(qcs);
-
/* Build a new STREAM frame with <out> buffer. */
- if (qcs->tx.sent_offset != qcs->tx.offset || fin) {
- /* Skip STREAM frame allocation if already subscribed for send.
- * Happens on sendto transient error or network congestion.
- */
- if (qcc->wait_event.events & SUB_RETRY_SEND) {
- TRACE_DEVEL("already subscribed for sending",
- QMUX_EV_QCS_SEND, qcc->conn, qcs);
- goto err;
- }
-
- if (qcs_build_stream_frm(qcs, out, fin, frms) < 0)
- goto err;
- }
+ flen = qcs_build_stream_frm(qcs, out, fin, frms, window_conn);
+ if (flen < 0)
+ goto err;
out:
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
- return xfer;
+ return flen;
err:
TRACE_DEVEL("leaving on error", QMUX_EV_QCS_SEND, qcc->conn, qcs);
@@ -2008,7 +2091,8 @@ static int qcc_io_send(struct qcc *qcc)
/* Temporary list for QCS on error. */
struct list qcs_failed = LIST_HEAD_INIT(qcs_failed);
struct qcs *qcs, *qcs_tmp, *first_qcs = NULL;
- int ret, total = 0;
+ uint64_t window_conn = qfctl_rcap(&qcc->tx.fc);
+ int ret, total = 0, resent;
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
@@ -2055,8 +2139,8 @@ static int qcc_io_send(struct qcc *qcc)
break;
/* Stream must not be present in send_list if it has nothing to send. */
- BUG_ON(!(qcs->flags & (QC_SF_TO_STOP_SENDING|QC_SF_TO_RESET)) &&
- !qcs_need_sending(qcs));
+ BUG_ON(!(qcs->flags & (QC_SF_FIN_STREAM|QC_SF_TO_STOP_SENDING|QC_SF_TO_RESET)) &&
+ (!qcs->stream || !qcs_prep_bytes(qcs)));
/* Each STOP_SENDING/RESET_STREAM frame is sent individually to
* guarantee its emission.
@@ -2070,7 +2154,8 @@ static int qcc_io_send(struct qcc *qcc)
/* Remove stream from send_list if it had only STOP_SENDING
* to send.
*/
- if (!(qcs->flags & QC_SF_TO_RESET) && !qcs_need_sending(qcs)) {
+ if (!(qcs->flags & (QC_SF_FIN_STREAM|QC_SF_TO_RESET)) &&
+ (!qcs->stream || !qcs_prep_bytes(qcs))) {
LIST_DEL_INIT(&qcs->el_send);
continue;
}
@@ -2091,9 +2176,12 @@ static int qcc_io_send(struct qcc *qcc)
continue;
}
- if (!(qcc->flags & QC_CF_BLK_MFCTL) &&
- !(qcs->flags & QC_SF_BLK_SFCTL)) {
- if ((ret = qcs_send(qcs, &frms)) < 0) {
+ /* Total sent bytes must not exceed connection window. */
+ BUG_ON(total > window_conn);
+
+ if (!qfctl_rblocked(&qcc->tx.fc) &&
+ !qfctl_rblocked(&qcs->tx.fc) && window_conn > total) {
+ if ((ret = qcs_send(qcs, &frms, window_conn - total)) < 0) {
/* Temporarily remove QCS from send-list. */
LIST_DEL_INIT(&qcs->el_send);
LIST_APPEND(&qcs_failed, &qcs->el_send);
@@ -2117,7 +2205,10 @@ static int qcc_io_send(struct qcc *qcc)
/* Retry sending until no frame to send, data rejected or connection
* flow-control limit reached.
*/
- while (qcc_send_frames(qcc, &frms) == 0 && !(qcc->flags & QC_CF_BLK_MFCTL)) {
+ while (qcc_send_frames(qcc, &frms) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
+ window_conn = qfctl_rcap(&qcc->tx.fc);
+ resent = 0;
+
/* Reloop over <qcc.send_list>. Useful for streams which have
* fulfilled their qc_stream_desc buf and have now release it.
*/
@@ -2126,16 +2217,20 @@ static int qcc_io_send(struct qcc *qcc)
* new qc_stream_desc should be present in send_list as
* long as transport layer can handle all data.
*/
- BUG_ON(qcs->stream->buf && !(qcs->flags & QC_SF_BLK_SFCTL));
+ BUG_ON(qcs->stream->buf && !qfctl_rblocked(&qcs->tx.fc));
+
+ /* Total sent bytes must not exceed connection window. */
+ BUG_ON(resent > window_conn);
- if (!(qcs->flags & QC_SF_BLK_SFCTL)) {
- if ((ret = qcs_send(qcs, &frms)) < 0) {
+ if (!qfctl_rblocked(&qcs->tx.fc) && window_conn > resent) {
+ if ((ret = qcs_send(qcs, &frms, window_conn - resent)) < 0) {
LIST_DEL_INIT(&qcs->el_send);
LIST_APPEND(&qcs_failed, &qcs->el_send);
continue;
}
total += ret;
+ resent += ret;
}
}
}
@@ -2156,7 +2251,7 @@ static int qcc_io_send(struct qcc *qcc)
LIST_APPEND(&qcc->send_list, &qcs->el_send);
}
- if (!(qcc->flags & QC_CF_BLK_MFCTL))
+ if (!qfctl_rblocked(&qcc->tx.fc))
tasklet_wakeup(qcc->wait_event.tasklet);
}
@@ -2276,7 +2371,7 @@ static void qcc_shutdown(struct qcc *qcc)
qcc_io_send(qcc);
}
else {
- qcc->err = quic_err_app(QC_ERR_NO_ERROR);
+ qcc->err = quic_err_transport(QC_ERR_NO_ERROR);
}
/* Register "no error" code at transport layer. Do not use
@@ -2381,9 +2476,7 @@ static int qcc_io_process(struct qcc *qcc)
return 0;
}
-/* release function. This one should be called to free all resources allocated
- * to the mux.
- */
+/* Free all resources allocated for <qcc> connection. */
static void qcc_release(struct qcc *qcc)
{
struct connection *conn = qcc->conn;
@@ -2391,8 +2484,6 @@ static void qcc_release(struct qcc *qcc)
TRACE_ENTER(QMUX_EV_QCC_END, conn);
- qcc_shutdown(qcc);
-
if (qcc->task) {
task_destroy(qcc->task);
qcc->task = NULL;
@@ -2465,6 +2556,7 @@ struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
return NULL;
release:
+ qcc_shutdown(qcc);
qcc_release(qcc);
TRACE_LEAVE(QMUX_EV_QCC_WAKE);
return NULL;
@@ -2507,6 +2599,7 @@ static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int sta
*/
if (qcc_is_dead(qcc)) {
TRACE_STATE("releasing dead connection", QMUX_EV_QCC_WAKE, qcc->conn);
+ qcc_shutdown(qcc);
qcc_release(qcc);
}
@@ -2519,6 +2612,17 @@ static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int sta
return t;
}
+/* Minimal initialization of <qcc> members to use qcc_release() safely. */
+static void _qcc_init(struct qcc *qcc)
+{
+ qcc->conn = NULL;
+ qcc->task = NULL;
+ qcc->wait_event.tasklet = NULL;
+ qcc->app_ops = NULL;
+ qcc->streams_by_id = EB_ROOT_UNIQUE;
+ LIST_INIT(&qcc->lfctl.frms);
+}
+
static int qmux_init(struct connection *conn, struct proxy *prx,
struct session *sess, struct buffer *input)
{
@@ -2530,24 +2634,19 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
qcc = pool_alloc(pool_head_qcc);
if (!qcc) {
TRACE_ERROR("alloc failure", QMUX_EV_QCC_NEW);
- goto fail_no_qcc;
+ goto err;
}
- qcc->conn = conn;
+ _qcc_init(qcc);
conn->ctx = qcc;
qcc->nb_hreq = qcc->nb_sc = 0;
qcc->flags = 0;
-
- qcc->app_ops = NULL;
-
- qcc->streams_by_id = EB_ROOT_UNIQUE;
+ qcc->glitches = 0;
+ qcc->err = quic_err_transport(QC_ERR_NO_ERROR);
/* Server parameters, params used for RX flow control. */
lparams = &conn->handle.qc->rx.params;
- qcc->tx.sent_offsets = qcc->tx.offsets = 0;
-
- LIST_INIT(&qcc->lfctl.frms);
qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = lparams->initial_max_streams_bidi;
qcc->lfctl.ms_uni = lparams->initial_max_streams_uni;
qcc->lfctl.msd_bidi_l = lparams->initial_max_stream_data_bidi_local;
@@ -2559,7 +2658,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
qcc->lfctl.offsets_recv = qcc->lfctl.offsets_consume = 0;
rparams = &conn->handle.qc->tx.params;
- qcc->rfctl.md = rparams->initial_max_data;
+ qfctl_init(&qcc->tx.fc, rparams->initial_max_data);
qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local;
qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote;
qcc->rfctl.msd_uni_l = rparams->initial_max_stream_data_uni;
@@ -2580,10 +2679,12 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
qcc->wait_event.tasklet = tasklet_new();
if (!qcc->wait_event.tasklet) {
TRACE_ERROR("taslket alloc failure", QMUX_EV_QCC_NEW);
- goto fail_no_tasklet;
+ goto err;
}
LIST_INIT(&qcc->send_list);
+ LIST_INIT(&qcc->fctl_list);
+ LIST_INIT(&qcc->buf_wait_list);
qcc->wait_event.tasklet->process = qcc_io_cb;
qcc->wait_event.tasklet->context = qcc;
@@ -2591,7 +2692,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
qcc->proxy = prx;
/* haproxy timeouts */
- if (conn_is_back(qcc->conn)) {
+ if (conn_is_back(conn)) {
qcc->timeout = prx->timeout.server;
qcc->shut_timeout = tick_isset(prx->timeout.serverfin) ?
prx->timeout.serverfin : prx->timeout.server;
@@ -2608,7 +2709,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
qcc->task = task_new_here();
if (!qcc->task) {
TRACE_ERROR("timeout task alloc failure", QMUX_EV_QCC_NEW);
- goto fail_no_timeout_task;
+ goto err;
}
qcc->task->process = qcc_timeout_task;
qcc->task->context = qcc;
@@ -2619,11 +2720,12 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
HA_ATOMIC_STORE(&conn->handle.qc->qcc, qcc);
+ /* Register conn as app_ops may use it. */
+ qcc->conn = conn;
+
if (qcc_install_app_ops(qcc, conn->handle.qc->app_ops)) {
- TRACE_PROTO("Cannot install app layer", QMUX_EV_QCC_NEW|QMUX_EV_QCC_ERR, qcc->conn);
- /* prepare a CONNECTION_CLOSE frame */
- quic_set_connection_close(conn->handle.qc, quic_err_transport(QC_ERR_APPLICATION_ERROR));
- goto fail_install_app_ops;
+ TRACE_PROTO("Cannot install app layer", QMUX_EV_QCC_NEW|QMUX_EV_QCC_ERR, conn);
+ goto err;
}
if (qcc->app_ops == &h3_ops)
@@ -2636,19 +2738,24 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
/* init read cycle */
tasklet_wakeup(qcc->wait_event.tasklet);
- TRACE_LEAVE(QMUX_EV_QCC_NEW, qcc->conn);
+ TRACE_LEAVE(QMUX_EV_QCC_NEW, conn);
return 0;
- fail_install_app_ops:
- if (qcc->app_ops && qcc->app_ops->release)
- qcc->app_ops->release(qcc->ctx);
- task_destroy(qcc->task);
- fail_no_timeout_task:
- tasklet_free(qcc->wait_event.tasklet);
- fail_no_tasklet:
- pool_free(pool_head_qcc, qcc);
- fail_no_qcc:
- TRACE_LEAVE(QMUX_EV_QCC_NEW);
+ err:
+ /* Prepare CONNECTION_CLOSE, using INTERNAL_ERROR as fallback code if unset. */
+ if (!(conn->handle.qc->flags & QUIC_FL_CONN_IMMEDIATE_CLOSE)) {
+ struct quic_err err = qcc && qcc->err.code ?
+ qcc->err : quic_err_transport(QC_ERR_INTERNAL_ERROR);
+ quic_set_connection_close(conn->handle.qc, err);
+ }
+
+ if (qcc) {
+ /* In case of MUX init failure, session will ensure connection is freed. */
+ qcc->conn = NULL;
+ qcc_release(qcc);
+ }
+
+ TRACE_DEVEL("leaving on error", QMUX_EV_QCC_NEW, conn);
return -1;
}
@@ -2704,6 +2811,7 @@ static void qmux_strm_detach(struct sedesc *sd)
return;
release:
+ qcc_shutdown(qcc);
qcc_release(qcc);
TRACE_LEAVE(QMUX_EV_STRM_END);
return;
@@ -2786,11 +2894,18 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf,
size_t count, int flags)
{
struct qcs *qcs = __sc_mux_strm(sc);
+ const size_t old_data = qcs_prep_bytes(qcs);
size_t ret = 0;
char fin;
TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
+ /* Stream must not be woken up if already waiting for conn buffer. */
+ BUG_ON(LIST_INLIST(&qcs->el_buf));
+
+ /* Sending forbidden if QCS is locally closed (FIN or RESET_STREAM sent). */
+ BUG_ON(qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET));
+
/* stream layer has been detached so no transfer must occur after. */
BUG_ON_HOT(qcs->flags & QC_SF_DETACH);
@@ -2801,8 +2916,20 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf,
goto end;
}
- if (qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET)) {
- ret = qcs_http_reset_buf(qcs, buf, count);
+ if (qfctl_sblocked(&qcs->qcc->tx.fc)) {
+ TRACE_DEVEL("leaving on connection flow control",
+ QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
+ if (!LIST_INLIST(&qcs->el_fctl)) {
+ TRACE_DEVEL("append to fctl-list",
+ QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
+ LIST_APPEND(&qcs->qcc->fctl_list, &qcs->el_fctl);
+ }
+ goto end;
+ }
+
+ if (qfctl_sblocked(&qcs->tx.fc)) {
+ TRACE_DEVEL("leaving on flow-control reached",
+ QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
goto end;
}
@@ -2813,7 +2940,9 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf,
}
if (ret || fin) {
- qcc_send_stream(qcs, 0);
+ const size_t data = qcs_prep_bytes(qcs) - old_data;
+ if (data || fin)
+ qcc_send_stream(qcs, 0, data);
if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
tasklet_wakeup(qcs->qcc->wait_event.tasklet);
}
@@ -2825,18 +2954,25 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf,
}
-static size_t qmux_nego_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice)
+static size_t qmux_strm_nego_ff(struct stconn *sc, struct buffer *input,
+ size_t count, unsigned int flags)
{
struct qcs *qcs = __sc_mux_strm(sc);
size_t ret = 0;
TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
+ /* Stream must not be woken up if already waiting for conn buffer. */
+ BUG_ON(LIST_INLIST(&qcs->el_buf));
+
+ /* Sending forbidden if QCS is locally closed (FIN or RESET_STREAM sent). */
+ BUG_ON(qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET));
+
/* stream layer has been detached so no transfer must occur after. */
BUG_ON_HOT(qcs->flags & QC_SF_DETACH);
if (!qcs->qcc->app_ops->nego_ff || !qcs->qcc->app_ops->done_ff) {
- /* Fast forwading is not supported by the QUIC application layer */
+ /* Fast forwarding is not supported by the QUIC application layer */
qcs->sd->iobuf.flags |= IOBUF_FL_NO_FF;
goto end;
}
@@ -2850,6 +2986,22 @@ static size_t qmux_nego_ff(struct stconn *sc, struct buffer *input, size_t count
goto end;
}
+ if (qfctl_sblocked(&qcs->qcc->tx.fc)) {
+ TRACE_DEVEL("leaving on connection flow control", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
+ if (!LIST_INLIST(&qcs->el_fctl)) {
+ TRACE_DEVEL("append to fctl-list", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
+ LIST_APPEND(&qcs->qcc->fctl_list, &qcs->el_fctl);
+ }
+ qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
+ goto end;
+ }
+
+ if (qfctl_sblocked(&qcs->tx.fc)) {
+ TRACE_DEVEL("leaving on flow-control reached", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
+ qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
+ goto end;
+ }
+
/* Alawys disable splicing */
qcs->sd->iobuf.flags |= IOBUF_FL_NO_SPLICING;
@@ -2880,36 +3032,37 @@ static size_t qmux_nego_ff(struct stconn *sc, struct buffer *input, size_t count
return ret;
}
-static size_t qmux_done_ff(struct stconn *sc)
+static size_t qmux_strm_done_ff(struct stconn *sc)
{
struct qcs *qcs = __sc_mux_strm(sc);
struct qcc *qcc = qcs->qcc;
struct sedesc *sd = qcs->sd;
- size_t total = 0;
+ size_t total = 0, data = sd->iobuf.data;
TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
- if (sd->iobuf.flags & IOBUF_FL_EOI)
+ if (sd->iobuf.flags & IOBUF_FL_EOI) {
+ TRACE_STATE("reached stream fin", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
qcs->flags |= QC_SF_FIN_STREAM;
+ }
if (!(qcs->flags & QC_SF_FIN_STREAM) && !sd->iobuf.data)
goto end;
+ data += sd->iobuf.offset;
total = qcs->qcc->app_ops->done_ff(qcs);
- qcc_send_stream(qcs, 0);
+ if (data || qcs->flags & QC_SF_FIN_STREAM)
+ qcc_send_stream(qcs, 0, data);
if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
tasklet_wakeup(qcc->wait_event.tasklet);
end:
- if (!b_data(&qcs->tx.buf))
- b_free(&qcs->tx.buf);
-
TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
return total;
}
-static int qmux_resume_ff(struct stconn *sc, unsigned int flags)
+static int qmux_strm_resume_ff(struct stconn *sc, unsigned int flags)
{
return 0;
}
@@ -2962,16 +3115,20 @@ static int qmux_wake(struct connection *conn)
return 0;
release:
+ qcc_shutdown(qcc);
qcc_release(qcc);
TRACE_LEAVE(QMUX_EV_QCC_WAKE);
return 1;
}
-static void qmux_strm_shutw(struct stconn *sc, enum co_shw_mode mode)
+static void qmux_strm_shut(struct stconn *sc, enum se_shut_mode mode, struct se_abort_info *reason)
{
struct qcs *qcs = __sc_mux_strm(sc);
struct qcc *qcc = qcs->qcc;
+ if (!(mode & (SE_SHW_SILENT|SE_SHW_NORMAL)))
+ return;
+
TRACE_ENTER(QMUX_EV_STRM_SHUT, qcc->conn, qcs);
/* Early closure reported if QC_SF_FIN_STREAM not yet set. */
@@ -2984,7 +3141,7 @@ static void qmux_strm_shutw(struct stconn *sc, enum co_shw_mode mode)
TRACE_STATE("set FIN STREAM",
QMUX_EV_STRM_SHUT, qcc->conn, qcs);
qcs->flags |= QC_SF_FIN_STREAM;
- qcc_send_stream(qcs, 0);
+ qcc_send_stream(qcs, 0, 0);
}
}
else {
@@ -2999,6 +3156,34 @@ static void qmux_strm_shutw(struct stconn *sc, enum co_shw_mode mode)
TRACE_LEAVE(QMUX_EV_STRM_SHUT, qcc->conn, qcs);
}
+static int qmux_ctl(struct connection *conn, enum mux_ctl_type mux_ctl, void *output)
+{
+ struct qcc *qcc = conn->ctx;
+
+ switch (mux_ctl) {
+ case MUX_CTL_EXIT_STATUS:
+ return MUX_ES_UNKNOWN;
+
+ case MUX_CTL_GET_GLITCHES:
+ return qcc->glitches;
+
+ case MUX_CTL_GET_NBSTRM: {
+ struct qcs *qcs;
+ unsigned int nb_strm = qcc->nb_sc;
+
+ list_for_each_entry(qcs, &qcc->opening_list, el_opening)
+ nb_strm++;
+ return nb_strm;
+ }
+
+ case MUX_CTL_GET_MAXSTRM:
+ return qcc->lfctl.ms_bidi_init;
+
+ default:
+ return -1;
+ }
+}
+
static int qmux_sctl(struct stconn *sc, enum mux_sctl_type mux_sctl, void *output)
{
int ret = 0;
@@ -3048,19 +3233,41 @@ static const struct mux_ops qmux_ops = {
.detach = qmux_strm_detach,
.rcv_buf = qmux_strm_rcv_buf,
.snd_buf = qmux_strm_snd_buf,
- .nego_fastfwd = qmux_nego_ff,
- .done_fastfwd = qmux_done_ff,
- .resume_fastfwd = qmux_resume_ff,
+ .nego_fastfwd = qmux_strm_nego_ff,
+ .done_fastfwd = qmux_strm_done_ff,
+ .resume_fastfwd = qmux_strm_resume_ff,
.subscribe = qmux_strm_subscribe,
.unsubscribe = qmux_strm_unsubscribe,
.wake = qmux_wake,
- .shutw = qmux_strm_shutw,
+ .shut = qmux_strm_shut,
+ .ctl = qmux_ctl,
.sctl = qmux_sctl,
.show_sd = qmux_strm_show_sd,
.flags = MX_FL_HTX|MX_FL_NO_UPG|MX_FL_FRAMED,
.name = "QUIC",
};
+void qcc_show_quic(struct qcc *qcc)
+{
+ struct eb64_node *node;
+ chunk_appendf(&trash, " qcc=0x%p flags=0x%x sc=%llu hreq=%llu\n",
+ qcc, qcc->flags, (ullong)qcc->nb_sc, (ullong)qcc->nb_hreq);
+
+ node = eb64_first(&qcc->streams_by_id);
+ while (node) {
+ struct qcs *qcs = eb64_entry(node, struct qcs, by_id);
+ chunk_appendf(&trash, " qcs=0x%p id=%llu flags=0x%x st=%s",
+ qcs, (ullong)qcs->id, qcs->flags,
+ qcs_st_to_str(qcs->st));
+ if (!quic_stream_is_uni(qcs->id) || !quic_stream_is_local(qcc, qcs->id))
+ chunk_appendf(&trash, " rxoff=%llu", (ullong)qcs->rx.offset);
+ if (!quic_stream_is_uni(qcs->id) || !quic_stream_is_remote(qcc, qcs->id))
+ chunk_appendf(&trash, " txoff=%llu", (ullong)qcs->tx.fc.off_real);
+ chunk_appendf(&trash, "\n");
+ node = eb64_next(node);
+ }
+}
+
static struct mux_proto_list mux_proto_quic =
{ .token = IST("quic"), .mode = PROTO_MODE_HTTP, .side = PROTO_SIDE_FE, .mux = &qmux_ops };