diff options
Diffstat (limited to '')
-rw-r--r-- | src/mux_quic.c | 853 |
1 files changed, 530 insertions, 323 deletions
diff --git a/src/mux_quic.c b/src/mux_quic.c index 05c92fa..ae504ee 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -3,6 +3,7 @@ #include <import/eb64tree.h> #include <haproxy/api.h> +#include <haproxy/chunk.h> #include <haproxy/connection.h> #include <haproxy/dynbuf.h> #include <haproxy/h3.h> @@ -13,6 +14,7 @@ #include <haproxy/qmux_http.h> #include <haproxy/qmux_trace.h> #include <haproxy/quic_conn.h> +#include <haproxy/quic_fctl.h> #include <haproxy/quic_frame.h> #include <haproxy/quic_sock.h> #include <haproxy/quic_stream.h> @@ -58,6 +60,8 @@ static void qcs_free(struct qcs *qcs) /* Safe to use even if already removed from the list. */ LIST_DEL_INIT(&qcs->el_opening); LIST_DEL_INIT(&qcs->el_send); + LIST_DEL_INIT(&qcs->el_fctl); + LIST_DEL_INIT(&qcs->el_buf); /* Release stream endpoint descriptor. */ BUG_ON(qcs->sd && !se_fl_test(qcs->sd, SE_FL_ORPHAN)); @@ -68,11 +72,10 @@ static void qcs_free(struct qcs *qcs) qcc->app_ops->detach(qcs); /* Release qc_stream_desc buffer from quic-conn layer. */ - qc_stream_desc_release(qcs->stream, qcs->tx.sent_offset); + qc_stream_desc_release(qcs->stream, qcs->tx.fc.off_real); - /* Free Rx/Tx buffers. */ + /* Free Rx buffer. */ qcs_free_ncbuf(qcs, &qcs->rx.ncbuf); - b_free(&qcs->tx.buf); /* Remove qcs from qcc tree. */ eb64_delete(&qcs->by_id); @@ -97,34 +100,45 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) qcs->stream = NULL; qcs->qcc = qcc; - qcs->sd = NULL; qcs->flags = QC_SF_NONE; qcs->st = QC_SS_IDLE; qcs->ctx = NULL; + qcs->sd = sedesc_new(); + if (!qcs->sd) + goto err; + qcs->sd->se = qcs; + qcs->sd->conn = qcc->conn; + se_fl_set(qcs->sd, SE_FL_T_MUX | SE_FL_ORPHAN | SE_FL_NOT_FIRST); + se_expect_no_data(qcs->sd); + + if (!(global.tune.no_zero_copy_fwd & NO_ZERO_COPY_FWD_QUIC_SND)) + se_fl_set(qcs->sd, SE_FL_MAY_FASTFWD_CONS); + /* App callback attach may register the stream for http-request wait. * These fields must be initialed before. */ LIST_INIT(&qcs->el_opening); LIST_INIT(&qcs->el_send); + LIST_INIT(&qcs->el_fctl); + LIST_INIT(&qcs->el_buf); qcs->start = TICK_ETERNITY; /* store transport layer stream descriptor in qcc tree */ qcs->id = qcs->by_id.key = id; eb64_insert(&qcc->streams_by_id, &qcs->by_id); - /* If stream is local, use peer remote-limit, or else the opposite. */ + /* Different limits can be set by the peer for local and remote bidi streams. */ if (quic_stream_is_bidi(id)) { - qcs->tx.msd = quic_stream_is_local(qcc, id) ? qcc->rfctl.msd_bidi_r : - qcc->rfctl.msd_bidi_l; + qfctl_init(&qcs->tx.fc, quic_stream_is_local(qcc, id) ? + qcc->rfctl.msd_bidi_r : qcc->rfctl.msd_bidi_l); } else if (quic_stream_is_local(qcc, id)) { - qcs->tx.msd = qcc->rfctl.msd_uni_l; + qfctl_init(&qcs->tx.fc, qcc->rfctl.msd_uni_l); + } + else { + qfctl_init(&qcs->tx.fc, 0); } - - /* Properly set flow-control blocking if initial MSD is nul. */ - if (!qcs->tx.msd) - qcs->flags |= QC_SF_BLK_SFCTL; qcs->rx.ncbuf = NCBUF_NULL; qcs->rx.app_buf = BUF_NULL; @@ -139,10 +153,6 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) } qcs->rx.msd_init = qcs->rx.msd; - qcs->tx.buf = BUF_NULL; - qcs->tx.offset = 0; - qcs->tx.sent_offset = 0; - qcs->wait_event.tasklet = NULL; qcs->wait_event.events = 0; qcs->subs = NULL; @@ -423,15 +433,6 @@ int qcs_is_close_remote(struct qcs *qcs) return qcs->st == QC_SS_HREM || qcs->st == QC_SS_CLO; } -/* Allocate if needed buffer <bptr> for stream <qcs>. - * - * Returns the buffer instance or NULL on allocation failure. - */ -struct buffer *qcs_get_buf(struct qcs *qcs, struct buffer *bptr) -{ - return b_alloc(bptr); -} - /* Allocate if needed buffer <ncbuf> for stream <qcs>. * * Returns the buffer instance or NULL on allocation failure. @@ -441,7 +442,7 @@ static struct ncbuf *qcs_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf) struct buffer buf = BUF_NULL; if (ncb_is_null(ncbuf)) { - if (!b_alloc(&buf)) + if (!b_alloc(&buf, DB_MUX_RX)) return NULL; *ncbuf = ncb_make(buf.area, buf.size, 0); @@ -511,6 +512,35 @@ void qcs_notify_send(struct qcs *qcs) } } +/* Notify on a new stream-desc buffer available for <qcc> connection. + * + * Returns true if a stream was woken up. If false is returned, this indicates + * to the caller that it's currently unnecessary to notify for the rest of the + * available buffers. + */ +int qcc_notify_buf(struct qcc *qcc) +{ + struct qcs *qcs; + int ret = 0; + + TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); + + if (qcc->flags & QC_CF_CONN_FULL) { + TRACE_STATE("new stream desc buffer available", QMUX_EV_QCC_WAKE, qcc->conn); + qcc->flags &= ~QC_CF_CONN_FULL; + } + + if (!LIST_ISEMPTY(&qcc->buf_wait_list)) { + qcs = LIST_ELEM(qcc->buf_wait_list.n, struct qcs *, el_buf); + LIST_DEL_INIT(&qcs->el_buf); + qcs_notify_send(qcs); + ret = 1; + } + + TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn); + return ret; +} + /* A fatal error is detected locally for <qcc> connection. It should be closed * with a CONNECTION_CLOSE using <err> code. Set <app> to true to indicate that * the code must be considered as an application level error. This function @@ -536,6 +566,28 @@ void qcc_set_error(struct qcc *qcc, int err, int app) tasklet_wakeup(qcc->wait_event.tasklet); } +/* Increment glitch counter for <qcc> connection by <inc> steps. If configured + * threshold reached, close the connection with an error code. + */ +int qcc_report_glitch(struct qcc *qcc, int inc) +{ + const int max = global.tune.quic_frontend_glitches_threshold; + + qcc->glitches += inc; + if (max && qcc->glitches >= max && !(qcc->flags & QC_CF_ERRL)) { + if (qcc->app_ops->report_susp) { + qcc->app_ops->report_susp(qcc->ctx); + qcc_set_error(qcc, qcc->err.code, 1); + } + else { + qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0); + } + return 1; + } + + return 0; +} + /* Open a locally initiated stream for the connection <qcc>. Set <bidi> for a * bidirectional stream, else an unidirectional stream is opened. The next * available ID on the connection will be used according to the stream type. @@ -650,17 +702,6 @@ struct stconn *qcs_attach_sc(struct qcs *qcs, struct buffer *buf, char fin) struct qcc *qcc = qcs->qcc; struct session *sess = qcc->conn->owner; - qcs->sd = sedesc_new(); - if (!qcs->sd) - return NULL; - - qcs->sd->se = qcs; - qcs->sd->conn = qcc->conn; - se_fl_set(qcs->sd, SE_FL_T_MUX | SE_FL_ORPHAN | SE_FL_NOT_FIRST); - se_expect_no_data(qcs->sd); - - if (!(global.tune.no_zero_copy_fwd & NO_ZERO_COPY_FWD_QUIC_SND)) - se_fl_set(qcs->sd, SE_FL_MAY_FASTFWD_CONS); /* TODO duplicated from mux_h2 */ sess->t_idle = ns_to_ms(now_ns - sess->accept_ts) - sess->t_handshake; @@ -899,7 +940,7 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs) fin = 1; if (!(qcs->flags & QC_SF_READ_ABORTED)) { - ret = qcc->app_ops->decode_qcs(qcs, &b, fin); + ret = qcc->app_ops->rcv_buf(qcs, &b, fin); if (ret < 0) { TRACE_ERROR("decoding error", QMUX_EV_QCS_RECV, qcc->conn, qcs); goto err; @@ -930,25 +971,170 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs) return 1; } +/* Allocate if needed and retrieve <qcs> stream buffer for data reception. + * + * Returns buffer pointer. May be NULL on allocation failure. + */ +struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs) +{ + return b_alloc(&qcs->rx.app_buf, DB_MUX_RX); +} + +/* Allocate if needed and retrieve <qcs> stream buffer for data emission. + * + * <err> is an output argument which is useful to differentiate the failure + * cause when the buffer cannot be allocated. It is set to 0 if the connection + * buffer limit is reached. For fatal errors, its value is non-zero. + * + * Returns buffer pointer. May be NULL on allocation failure, in which case + * <err> will refer to the cause. + */ +struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err) +{ + struct qcc *qcc = qcs->qcc; + int buf_avail; + struct buffer *out = qc_stream_buf_get(qcs->stream); + + /* Stream must not try to reallocate a buffer if currently waiting for one. */ + BUG_ON(LIST_INLIST(&qcs->el_buf)); + + *err = 0; + + if (!out) { + if (qcc->flags & QC_CF_CONN_FULL) { + LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf); + goto out; + } + + out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real, + &buf_avail); + if (!out) { + if (buf_avail) { + TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs); + *err = 1; + goto out; + } + + TRACE_STATE("hitting stream desc buffer limit", QMUX_EV_QCS_SEND, qcc->conn, qcs); + LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf); + qcc->flags |= QC_CF_CONN_FULL; + goto out; + } + + if (!b_alloc(out, DB_MUX_TX)) { + TRACE_ERROR("buffer alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs); + *err = 1; + goto out; + } + } + + out: + return out; +} + +/* Returns total number of bytes not already sent to quic-conn layer. */ +static uint64_t qcs_prep_bytes(const struct qcs *qcs) +{ + struct buffer *out = qc_stream_buf_get(qcs->stream); + uint64_t diff, base_off; + + if (!out) + return 0; + + /* if ack_offset < buf_offset, it points to an older buffer. */ + base_off = MAX(qcs->stream->buf_offset, qcs->stream->ack_offset); + diff = qcs->tx.fc.off_real - base_off; + return b_data(out) - diff; +} + +/* Try to realign <out> buffer for <qcs> stream. This is done only if there is + * no data waiting for ACK. + * + * Returns 0 if realign was performed else non-zero. + */ +int qcc_realign_stream_txbuf(const struct qcs *qcs, struct buffer *out) +{ + if (qcs_prep_bytes(qcs) == b_data(out)) { + b_slow_realign(out, trash.area, b_data(out)); + return 0; + } + + return 1; +} + +/* Release the current <qcs> Tx buffer. This is useful if space left is not + * enough anymore. A new instance can then be allocated to continue sending. + * + * This operation fails if there is not yet sent bytes in the buffer. In this + * case, stream layer should interrupt sending until further notification. + * + * Returns 0 if buffer is released and a new one can be allocated or non-zero + * if there is still remaining data. + */ +int qcc_release_stream_txbuf(struct qcs *qcs) +{ + const uint64_t bytes = qcs_prep_bytes(qcs); + + /* Cannot release buffer if prepared data is not fully sent. */ + if (bytes) { + qcs->flags |= QC_SF_BLK_MROOM; + return 1; + } + + qc_stream_buf_release(qcs->stream); + return 0; +} + +/* Returns true if stream layer can proceed to emission via <qcs>. */ +int qcc_stream_can_send(const struct qcs *qcs) +{ + return !(qcs->flags & QC_SF_BLK_MROOM) && !LIST_INLIST(&qcs->el_buf); +} + +/* Wakes up every streams of <qcc> which are currently waiting for sending but + * are blocked on connection flow control. + */ +static void qcc_notify_fctl(struct qcc *qcc) +{ + struct qcs *qcs; + + while (!LIST_ISEMPTY(&qcc->fctl_list)) { + qcs = LIST_ELEM(qcc->fctl_list.n, struct qcs *, el_fctl); + LIST_DEL_INIT(&qcs->el_fctl); + qcs_notify_send(qcs); + } +} + /* Prepare for the emission of RESET_STREAM on <qcs> with error code <err>. */ void qcc_reset_stream(struct qcs *qcs, int err) { struct qcc *qcc = qcs->qcc; + const uint64_t diff = qcs_prep_bytes(qcs); if ((qcs->flags & QC_SF_TO_RESET) || qcs_is_close_local(qcs)) return; + /* TODO if QCS waiting for buffer, it could be removed from + * <qcc.buf_wait_list> if sending is closed now. + */ + TRACE_STATE("reset stream", QMUX_EV_QCS_END, qcc->conn, qcs); qcs->flags |= QC_SF_TO_RESET; qcs->err = err; - /* Remove prepared stream data from connection flow-control calcul. */ - if (qcs->tx.offset > qcs->tx.sent_offset) { - const uint64_t diff = qcs->tx.offset - qcs->tx.sent_offset; - BUG_ON(qcc->tx.offsets - diff < qcc->tx.sent_offsets); - qcc->tx.offsets -= diff; - /* Reset qcs offset to prevent BUG_ON() on qcs_destroy(). */ - qcs->tx.offset = qcs->tx.sent_offset; + if (diff) { + const int soft_blocked = qfctl_sblocked(&qcc->tx.fc); + + /* Soft offset cannot be inferior to real one. */ + BUG_ON(qcc->tx.fc.off_soft - diff < qcc->tx.fc.off_real); + + /* Subtract to conn flow control data amount prepared on stream not yet sent. */ + qcc->tx.fc.off_soft -= diff; + if (soft_blocked && !qfctl_sblocked(&qcc->tx.fc)) + qcc_notify_fctl(qcc); + + /* Reset QCS soft off to prevent BUG_ON() on qcs_destroy(). */ + qcs->tx.fc.off_soft = qcs->tx.fc.off_real; } /* Report send error to stream-endpoint layer. */ @@ -957,15 +1143,16 @@ void qcc_reset_stream(struct qcs *qcs, int err) qcs_alert(qcs); } - qcc_send_stream(qcs, 1); + qcc_send_stream(qcs, 1, 0); tasklet_wakeup(qcc->wait_event.tasklet); } /* Register <qcs> stream for emission of STREAM, STOP_SENDING or RESET_STREAM. * Set <urg> to 1 if stream content should be treated in priority compared to - * other streams. + * other streams. For STREAM emission, <count> must contains the size of the + * frame payload. This is used for flow control accounting. */ -void qcc_send_stream(struct qcs *qcs, int urg) +void qcc_send_stream(struct qcs *qcs, int urg, int count) { struct qcc *qcc = qcs->qcc; @@ -983,6 +1170,11 @@ void qcc_send_stream(struct qcs *qcs, int urg) LIST_APPEND(&qcs->qcc->send_list, &qcs->el_send); } + if (count) { + qfctl_sinc(&qcc->tx.fc, count); + qfctl_sinc(&qcs->tx.fc, count); + } + TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs); } @@ -999,7 +1191,7 @@ void qcc_abort_stream_read(struct qcs *qcs) TRACE_STATE("abort stream read", QMUX_EV_QCS_END, qcc->conn, qcs); qcs->flags |= (QC_SF_TO_STOP_SENDING|QC_SF_READ_ABORTED); - qcc_send_stream(qcs, 1); + qcc_send_stream(qcs, 1, 0); tasklet_wakeup(qcc->wait_event.tasklet); end: @@ -1203,17 +1395,19 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, */ int qcc_recv_max_data(struct qcc *qcc, uint64_t max) { + int unblock_soft = 0, unblock_real = 0; + TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn); TRACE_PROTO("receiving MAX_DATA", QMUX_EV_QCC_RECV, qcc->conn); - if (qcc->rfctl.md < max) { - qcc->rfctl.md = max; + if (qfctl_set_max(&qcc->tx.fc, max, &unblock_soft, &unblock_real)) { TRACE_DATA("increase remote max-data", QMUX_EV_QCC_RECV, qcc->conn); - if (qcc->flags & QC_CF_BLK_MFCTL) { - qcc->flags &= ~QC_CF_BLK_MFCTL; + if (unblock_real) tasklet_wakeup(qcc->wait_event.tasklet); - } + + if (unblock_soft) + qcc_notify_fctl(qcc); } TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn); @@ -1249,16 +1443,18 @@ int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max) goto err; if (qcs) { + int unblock_soft = 0, unblock_real = 0; + TRACE_PROTO("receiving MAX_STREAM_DATA", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs); - if (max > qcs->tx.msd) { - qcs->tx.msd = max; + if (qfctl_set_max(&qcs->tx.fc, max, &unblock_soft, &unblock_real)) { TRACE_DATA("increase remote max-stream-data", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs); - - if (qcs->flags & QC_SF_BLK_SFCTL) { - qcs->flags &= ~QC_SF_BLK_SFCTL; + if (unblock_real) { /* TODO optim: only wakeup IO-CB if stream has data to sent. */ tasklet_wakeup(qcc->wait_event.tasklet); } + + if (unblock_soft) + qcs_notify_send(qcs); } } @@ -1410,14 +1606,18 @@ int qcc_recv_stop_sending(struct qcc *qcc, uint64_t id, uint64_t err) } } - /* If FIN already reached, future RESET_STREAMS will be ignored. - * Manually set EOS in this case. - */ + /* Manually set EOS if FIN already reached as futures RESET_STREAM will be ignored in this case. */ if (qcs_sc(qcs) && se_fl_test(qcs->sd, SE_FL_EOI)) { se_fl_set(qcs->sd, SE_FL_EOS); qcs_alert(qcs); } + /* If not defined yet, set abort info for the sedesc */ + if (!qcs->sd->abort_info.info) { + qcs->sd->abort_info.info = (SE_ABRT_SRC_MUX_QUIC << SE_ABRT_SRC_SHIFT); + qcs->sd->abort_info.code = err; + } + /* RFC 9000 3.5. Solicited State Transitions * * An endpoint that receives a STOP_SENDING frame @@ -1500,12 +1700,12 @@ static void qcs_destroy(struct qcs *qcs) TRACE_ENTER(QMUX_EV_QCS_END, conn, qcs); - /* MUST not removed a stream with sending prepared data left. This is - * to ensure consistency on connection flow-control calculation. - */ - BUG_ON(qcs->tx.offset < qcs->tx.sent_offset); + if (!(qcc->flags & (QC_CF_ERR_CONN|QC_CF_ERRL))) { + /* MUST not removed a stream with sending prepared data left. This is + * to ensure consistency on connection flow-control calculation. + */ + BUG_ON(qcs->tx.fc.off_soft != qcs->tx.fc.off_real); - if (!(qcc->flags & QC_CF_ERRL)) { if (quic_stream_is_remote(qcc, id)) qcc_release_remote_stream(qcc, id); } @@ -1515,114 +1715,52 @@ static void qcs_destroy(struct qcs *qcs) TRACE_LEAVE(QMUX_EV_QCS_END, conn); } -/* Transfer as much as possible data on <qcs> from <in> to <out>. This is done - * in respect with available flow-control at stream and connection level. +/* Prepare a STREAM frame for <qcs> instance using <out> as payload. The frame + * is appended in <frm_list>. Set <fin> if this is supposed to be the last + * stream frame. If <out> is NULL an empty STREAM frame is built : this may be + * useful if FIN needs to be sent without any data left. Frame length will be + * truncated if greater than <fc_conn_wnd>. This allows to prepare several + * frames in a loop while respecting connection flow control window. * - * Returns the total bytes of transferred data or a negative error code. + * Returns the payload length of the STREAM frame or a negative error code. */ -static int qcs_xfer_data(struct qcs *qcs, struct buffer *out, struct buffer *in) +static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin, + struct list *frm_list, uint64_t window_conn) { struct qcc *qcc = qcs->qcc; - int left, to_xfer; - int total = 0; + struct quic_frame *frm; + const uint64_t window_stream = qfctl_rcap(&qcs->tx.fc); + const uint64_t bytes = qcs_prep_bytes(qcs); + uint64_t total; TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs); - if (!qcs_get_buf(qcs, out)) { - TRACE_ERROR("buffer alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs); - goto err; - } - - /* - * QCS out buffer diagram - * head left to_xfer - * -------------> ----------> -----> - * -------------------------------------------------- - * |...............|xxxxxxxxxxx|<<<<< - * -------------------------------------------------- - * ^ ack-off ^ sent-off ^ off - * - * STREAM frame - * ^ ^ - * |xxxxxxxxxxxxxxxxx| - */ - - BUG_ON_HOT(qcs->tx.sent_offset < qcs->stream->ack_offset); - BUG_ON_HOT(qcs->tx.offset < qcs->tx.sent_offset); - BUG_ON_HOT(qcc->tx.offsets < qcc->tx.sent_offsets); + /* This must only be called if there is data left, or at least a standalone FIN. */ + BUG_ON((!out || !b_data(out)) && !fin); - left = qcs->tx.offset - qcs->tx.sent_offset; - to_xfer = QUIC_MIN(b_data(in), b_room(out)); + total = bytes; - BUG_ON_HOT(qcs->tx.offset > qcs->tx.msd); - /* do not exceed flow control limit */ - if (qcs->tx.offset + to_xfer > qcs->tx.msd) { + /* do not exceed stream flow control limit */ + if (total > window_stream) { TRACE_DATA("do not exceed stream flow control", QMUX_EV_QCS_SEND, qcc->conn, qcs); - to_xfer = qcs->tx.msd - qcs->tx.offset; + total = window_stream; } - BUG_ON_HOT(qcc->tx.offsets > qcc->rfctl.md); - /* do not overcome flow control limit on connection */ - if (qcc->tx.offsets + to_xfer > qcc->rfctl.md) { + /* do not exceed connection flow control limit */ + if (total > window_conn) { TRACE_DATA("do not exceed conn flow control", QMUX_EV_QCS_SEND, qcc->conn, qcs); - to_xfer = qcc->rfctl.md - qcc->tx.offsets; + total = window_conn; } - if (!left && !to_xfer) - goto out; - - total = b_force_xfer(out, in, to_xfer); - - out: - { - struct qcs_xfer_data_trace_arg arg = { - .prep = b_data(out), .xfer = total, - }; - TRACE_LEAVE(QMUX_EV_QCS_SEND|QMUX_EV_QCS_XFER_DATA, - qcc->conn, qcs, &arg); - } - - return total; - - err: - TRACE_DEVEL("leaving on error", QMUX_EV_QCS_SEND, qcc->conn, qcs); - return -1; -} - -/* Prepare a STREAM frame for <qcs> instance using <out> as payload. The frame - * is appended in <frm_list>. Set <fin> if this is supposed to be the last - * stream frame. If <out> is NULL an empty STREAM frame is built : this may be - * useful if FIN needs to be sent without any data left. - * - * Returns the payload length of the STREAM frame or a negative error code. - */ -static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin, - struct list *frm_list) -{ - struct qcc *qcc = qcs->qcc; - struct quic_frame *frm; - int head, total; - uint64_t base_off; - - TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs); - - /* if ack_offset < buf_offset, it points to an older buffer. */ - base_off = MAX(qcs->stream->buf_offset, qcs->stream->ack_offset); - BUG_ON(qcs->tx.sent_offset < base_off); - - head = qcs->tx.sent_offset - base_off; - total = out ? b_data(out) - head : 0; - BUG_ON(total < 0); + /* Reset FIN if bytes to send is capped by flow control. */ + if (total < bytes) + fin = 0; if (!total && !fin) { /* No need to send anything if total is NULL and no FIN to signal. */ TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs); return 0; } - BUG_ON((!total && qcs->tx.sent_offset > qcs->tx.offset) || - (total && qcs->tx.sent_offset >= qcs->tx.offset)); - BUG_ON(qcs->tx.sent_offset + total > qcs->tx.offset); - BUG_ON(qcc->tx.sent_offsets + total > qcc->rfctl.md); TRACE_PROTO("sending STREAM frame", QMUX_EV_QCS_SEND, qcc->conn, qcs); frm = qc_frm_alloc(QUIC_FT_STREAM_8); @@ -1638,7 +1776,7 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin, if (total) { frm->stream.buf = out; - frm->stream.data = (unsigned char *)b_peek(out, head); + frm->stream.data = (unsigned char *)b_peek(out, b_data(out) - bytes); } else { /* Empty STREAM frame. */ @@ -1650,9 +1788,9 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin, if (fin) frm->type |= QUIC_STREAM_FRAME_TYPE_FIN_BIT; - if (qcs->tx.sent_offset) { + if (qcs->tx.fc.off_real) { frm->type |= QUIC_STREAM_FRAME_TYPE_OFF_BIT; - frm->stream.offset.key = qcs->tx.sent_offset; + frm->stream.offset.key = qcs->tx.fc.off_real; } /* Always set length bit as we do not know if there is remaining frames @@ -1680,23 +1818,6 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin, return -1; } -/* Check after transferring data from qcs.tx.buf if FIN must be set on the next - * STREAM frame for <qcs>. - * - * Returns true if FIN must be set else false. - */ -static int qcs_stream_fin(struct qcs *qcs) -{ - return qcs->flags & QC_SF_FIN_STREAM && !b_data(&qcs->tx.buf); -} - -/* Return true if <qcs> has data to send in new STREAM frames. */ -static forceinline int qcs_need_sending(struct qcs *qcs) -{ - return b_data(&qcs->tx.buf) || qcs->tx.sent_offset < qcs->tx.offset || - qcs_stream_fin(qcs); -} - /* This function must be called by the upper layer to inform about the sending * of a STREAM frame for <qcs> instance. The frame is of <data> length and on * <offset>. @@ -1708,42 +1829,45 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs); - BUG_ON(offset > qcs->tx.sent_offset); - BUG_ON(offset + data > qcs->tx.offset); + /* Real off MUST always be the greatest offset sent. */ + BUG_ON(offset > qcs->tx.fc.off_real); /* check if the STREAM frame has already been notified. It can happen * for retransmission. */ - if (offset + data < qcs->tx.sent_offset) { + if (offset + data < qcs->tx.fc.off_real) { TRACE_DEVEL("offset already notified", QMUX_EV_QCS_SEND, qcc->conn, qcs); goto out; } qcs_idle_open(qcs); - diff = offset + data - qcs->tx.sent_offset; + diff = offset + data - qcs->tx.fc.off_real; if (diff) { + struct quic_fctl *fc_conn = &qcc->tx.fc; + struct quic_fctl *fc_strm = &qcs->tx.fc; + + /* Ensure real offset never exceeds soft value. */ + BUG_ON(fc_conn->off_real + diff > fc_conn->off_soft); + BUG_ON(fc_strm->off_real + diff > fc_strm->off_soft); + /* increase offset sum on connection */ - qcc->tx.sent_offsets += diff; - BUG_ON_HOT(qcc->tx.sent_offsets > qcc->rfctl.md); - if (qcc->tx.sent_offsets == qcc->rfctl.md) { - qcc->flags |= QC_CF_BLK_MFCTL; - TRACE_STATE("connection flow-control reached", QMUX_EV_QCS_SEND, qcc->conn); + if (qfctl_rinc(fc_conn, diff)) { + TRACE_STATE("connection flow-control reached", + QMUX_EV_QCS_SEND, qcc->conn); } /* increase offset on stream */ - qcs->tx.sent_offset += diff; - BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.msd); - BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.offset); - if (qcs->tx.sent_offset == qcs->tx.msd) { - qcs->flags |= QC_SF_BLK_SFCTL; - TRACE_STATE("stream flow-control reached", QMUX_EV_QCS_SEND, qcc->conn, qcs); + if (qfctl_rinc(fc_strm, diff)) { + TRACE_STATE("stream flow-control reached", + QMUX_EV_QCS_SEND, qcc->conn, qcs); } - - /* If qcs.stream.buf is full, release it to the lower layer. */ - if (qcs->tx.offset == qcs->tx.sent_offset && - b_full(&qcs->stream->buf->buf)) { + /* Release buffer if everything sent and buf is full or stream is waiting for room. */ + if (!qcs_prep_bytes(qcs) && + (b_full(&qcs->stream->buf->buf) || qcs->flags & QC_SF_BLK_MROOM)) { qc_stream_buf_release(qcs->stream); + qcs->flags &= ~QC_SF_BLK_MROOM; + qcs_notify_send(qcs); } /* Add measurement for send rate. This is done at the MUX layer @@ -1752,7 +1876,7 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset) increment_send_rate(diff, 0); } - if (qcs->tx.offset == qcs->tx.sent_offset && !b_data(&qcs->tx.buf)) { + if (!qc_stream_buf_get(qcs->stream) || !qcs_prep_bytes(qcs)) { /* Remove stream from send_list if all was sent. */ LIST_DEL_INIT(&qcs->el_send); TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs); @@ -1842,7 +1966,7 @@ static int qcs_send_reset(struct qcs *qcs) frm->reset_stream.id = qcs->id; frm->reset_stream.app_error_code = qcs->err; - frm->reset_stream.final_size = qcs->tx.sent_offset; + frm->reset_stream.final_size = qcs->tx.fc.off_real; LIST_APPEND(&frms, &frm->list); if (qcc_send_frames(qcs->qcc, &frms)) { @@ -1910,87 +2034,46 @@ static int qcs_send_stop_sending(struct qcs *qcs) return 0; } -/* Used internally by qcc_io_send function. Proceed to send for <qcs>. This will - * transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame - * is then generated and inserted in <frms> list. +/* Used internally by qcc_io_send function. Proceed to send for <qcs>. A STREAM + * frame is generated pointing to QCS stream descriptor content and inserted in + * <frms> list. Frame length will be truncated if greater than <window_conn>. + * This allows to prepare several frames in a loop while respecting connection + * flow control window. * - * Returns the total bytes transferred between qcs and quic_stream buffers. Can - * be null if out buffer cannot be allocated. On error a negative error code is - * used. + * Returns the payload length of the STREAM frame or a negative error code. */ -static int qcs_send(struct qcs *qcs, struct list *frms) +static int qcs_send(struct qcs *qcs, struct list *frms, uint64_t window_conn) { struct qcc *qcc = qcs->qcc; - struct buffer *buf = &qcs->tx.buf; struct buffer *out = qc_stream_buf_get(qcs->stream); - int xfer = 0, buf_avail; - char fin = 0; + int flen = 0; + const char fin = qcs->flags & QC_SF_FIN_STREAM; TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs); /* Cannot send STREAM on remote unidirectional streams. */ BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_remote(qcc, qcs->id)); - if (b_data(buf)) { - /* Allocate <out> buffer if not already done. */ - if (!out) { - if (qcc->flags & QC_CF_CONN_FULL) - goto out; - - out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset, - &buf_avail); - if (!out) { - if (buf_avail) { - TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs); - goto err; - } - - TRACE_STATE("hitting stream desc buffer limit", QMUX_EV_QCS_SEND, qcc->conn, qcs); - qcc->flags |= QC_CF_CONN_FULL; - goto out; - } - } - - /* Transfer data from <buf> to <out>. */ - xfer = qcs_xfer_data(qcs, out, buf); - if (xfer < 0) - goto err; - - if (xfer > 0) { - qcs_notify_send(qcs); - qcs->flags &= ~QC_SF_BLK_MROOM; - } + /* This function must not be called if there is nothing to send. */ + BUG_ON(!fin && !qcs_prep_bytes(qcs)); - qcs->tx.offset += xfer; - BUG_ON_HOT(qcs->tx.offset > qcs->tx.msd); - qcc->tx.offsets += xfer; - BUG_ON_HOT(qcc->tx.offsets > qcc->rfctl.md); - - /* out buffer cannot be emptied if qcs offsets differ. */ - BUG_ON(!b_data(out) && qcs->tx.sent_offset != qcs->tx.offset); + /* Skip STREAM frame allocation if already subscribed for send. + * Happens on sendto transient error or network congestion. + */ + if (qcc->wait_event.events & SUB_RETRY_SEND) { + TRACE_DEVEL("already subscribed for sending", + QMUX_EV_QCS_SEND, qcc->conn, qcs); + goto err; } - /* FIN is set if all incoming data were transferred. */ - fin = qcs_stream_fin(qcs); - /* Build a new STREAM frame with <out> buffer. */ - if (qcs->tx.sent_offset != qcs->tx.offset || fin) { - /* Skip STREAM frame allocation if already subscribed for send. - * Happens on sendto transient error or network congestion. - */ - if (qcc->wait_event.events & SUB_RETRY_SEND) { - TRACE_DEVEL("already subscribed for sending", - QMUX_EV_QCS_SEND, qcc->conn, qcs); - goto err; - } - - if (qcs_build_stream_frm(qcs, out, fin, frms) < 0) - goto err; - } + flen = qcs_build_stream_frm(qcs, out, fin, frms, window_conn); + if (flen < 0) + goto err; out: TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs); - return xfer; + return flen; err: TRACE_DEVEL("leaving on error", QMUX_EV_QCS_SEND, qcc->conn, qcs); @@ -2008,7 +2091,8 @@ static int qcc_io_send(struct qcc *qcc) /* Temporary list for QCS on error. */ struct list qcs_failed = LIST_HEAD_INIT(qcs_failed); struct qcs *qcs, *qcs_tmp, *first_qcs = NULL; - int ret, total = 0; + uint64_t window_conn = qfctl_rcap(&qcc->tx.fc); + int ret, total = 0, resent; TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); @@ -2055,8 +2139,8 @@ static int qcc_io_send(struct qcc *qcc) break; /* Stream must not be present in send_list if it has nothing to send. */ - BUG_ON(!(qcs->flags & (QC_SF_TO_STOP_SENDING|QC_SF_TO_RESET)) && - !qcs_need_sending(qcs)); + BUG_ON(!(qcs->flags & (QC_SF_FIN_STREAM|QC_SF_TO_STOP_SENDING|QC_SF_TO_RESET)) && + (!qcs->stream || !qcs_prep_bytes(qcs))); /* Each STOP_SENDING/RESET_STREAM frame is sent individually to * guarantee its emission. @@ -2070,7 +2154,8 @@ static int qcc_io_send(struct qcc *qcc) /* Remove stream from send_list if it had only STOP_SENDING * to send. */ - if (!(qcs->flags & QC_SF_TO_RESET) && !qcs_need_sending(qcs)) { + if (!(qcs->flags & (QC_SF_FIN_STREAM|QC_SF_TO_RESET)) && + (!qcs->stream || !qcs_prep_bytes(qcs))) { LIST_DEL_INIT(&qcs->el_send); continue; } @@ -2091,9 +2176,12 @@ static int qcc_io_send(struct qcc *qcc) continue; } - if (!(qcc->flags & QC_CF_BLK_MFCTL) && - !(qcs->flags & QC_SF_BLK_SFCTL)) { - if ((ret = qcs_send(qcs, &frms)) < 0) { + /* Total sent bytes must not exceed connection window. */ + BUG_ON(total > window_conn); + + if (!qfctl_rblocked(&qcc->tx.fc) && + !qfctl_rblocked(&qcs->tx.fc) && window_conn > total) { + if ((ret = qcs_send(qcs, &frms, window_conn - total)) < 0) { /* Temporarily remove QCS from send-list. */ LIST_DEL_INIT(&qcs->el_send); LIST_APPEND(&qcs_failed, &qcs->el_send); @@ -2117,7 +2205,10 @@ static int qcc_io_send(struct qcc *qcc) /* Retry sending until no frame to send, data rejected or connection * flow-control limit reached. */ - while (qcc_send_frames(qcc, &frms) == 0 && !(qcc->flags & QC_CF_BLK_MFCTL)) { + while (qcc_send_frames(qcc, &frms) == 0 && !qfctl_rblocked(&qcc->tx.fc)) { + window_conn = qfctl_rcap(&qcc->tx.fc); + resent = 0; + /* Reloop over <qcc.send_list>. Useful for streams which have * fulfilled their qc_stream_desc buf and have now release it. */ @@ -2126,16 +2217,20 @@ static int qcc_io_send(struct qcc *qcc) * new qc_stream_desc should be present in send_list as * long as transport layer can handle all data. */ - BUG_ON(qcs->stream->buf && !(qcs->flags & QC_SF_BLK_SFCTL)); + BUG_ON(qcs->stream->buf && !qfctl_rblocked(&qcs->tx.fc)); + + /* Total sent bytes must not exceed connection window. */ + BUG_ON(resent > window_conn); - if (!(qcs->flags & QC_SF_BLK_SFCTL)) { - if ((ret = qcs_send(qcs, &frms)) < 0) { + if (!qfctl_rblocked(&qcs->tx.fc) && window_conn > resent) { + if ((ret = qcs_send(qcs, &frms, window_conn - resent)) < 0) { LIST_DEL_INIT(&qcs->el_send); LIST_APPEND(&qcs_failed, &qcs->el_send); continue; } total += ret; + resent += ret; } } } @@ -2156,7 +2251,7 @@ static int qcc_io_send(struct qcc *qcc) LIST_APPEND(&qcc->send_list, &qcs->el_send); } - if (!(qcc->flags & QC_CF_BLK_MFCTL)) + if (!qfctl_rblocked(&qcc->tx.fc)) tasklet_wakeup(qcc->wait_event.tasklet); } @@ -2276,7 +2371,7 @@ static void qcc_shutdown(struct qcc *qcc) qcc_io_send(qcc); } else { - qcc->err = quic_err_app(QC_ERR_NO_ERROR); + qcc->err = quic_err_transport(QC_ERR_NO_ERROR); } /* Register "no error" code at transport layer. Do not use @@ -2381,9 +2476,7 @@ static int qcc_io_process(struct qcc *qcc) return 0; } -/* release function. This one should be called to free all resources allocated - * to the mux. - */ +/* Free all resources allocated for <qcc> connection. */ static void qcc_release(struct qcc *qcc) { struct connection *conn = qcc->conn; @@ -2391,8 +2484,6 @@ static void qcc_release(struct qcc *qcc) TRACE_ENTER(QMUX_EV_QCC_END, conn); - qcc_shutdown(qcc); - if (qcc->task) { task_destroy(qcc->task); qcc->task = NULL; @@ -2465,6 +2556,7 @@ struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status) return NULL; release: + qcc_shutdown(qcc); qcc_release(qcc); TRACE_LEAVE(QMUX_EV_QCC_WAKE); return NULL; @@ -2507,6 +2599,7 @@ static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int sta */ if (qcc_is_dead(qcc)) { TRACE_STATE("releasing dead connection", QMUX_EV_QCC_WAKE, qcc->conn); + qcc_shutdown(qcc); qcc_release(qcc); } @@ -2519,6 +2612,17 @@ static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int sta return t; } +/* Minimal initialization of <qcc> members to use qcc_release() safely. */ +static void _qcc_init(struct qcc *qcc) +{ + qcc->conn = NULL; + qcc->task = NULL; + qcc->wait_event.tasklet = NULL; + qcc->app_ops = NULL; + qcc->streams_by_id = EB_ROOT_UNIQUE; + LIST_INIT(&qcc->lfctl.frms); +} + static int qmux_init(struct connection *conn, struct proxy *prx, struct session *sess, struct buffer *input) { @@ -2530,24 +2634,19 @@ static int qmux_init(struct connection *conn, struct proxy *prx, qcc = pool_alloc(pool_head_qcc); if (!qcc) { TRACE_ERROR("alloc failure", QMUX_EV_QCC_NEW); - goto fail_no_qcc; + goto err; } - qcc->conn = conn; + _qcc_init(qcc); conn->ctx = qcc; qcc->nb_hreq = qcc->nb_sc = 0; qcc->flags = 0; - - qcc->app_ops = NULL; - - qcc->streams_by_id = EB_ROOT_UNIQUE; + qcc->glitches = 0; + qcc->err = quic_err_transport(QC_ERR_NO_ERROR); /* Server parameters, params used for RX flow control. */ lparams = &conn->handle.qc->rx.params; - qcc->tx.sent_offsets = qcc->tx.offsets = 0; - - LIST_INIT(&qcc->lfctl.frms); qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = lparams->initial_max_streams_bidi; qcc->lfctl.ms_uni = lparams->initial_max_streams_uni; qcc->lfctl.msd_bidi_l = lparams->initial_max_stream_data_bidi_local; @@ -2559,7 +2658,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx, qcc->lfctl.offsets_recv = qcc->lfctl.offsets_consume = 0; rparams = &conn->handle.qc->tx.params; - qcc->rfctl.md = rparams->initial_max_data; + qfctl_init(&qcc->tx.fc, rparams->initial_max_data); qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local; qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote; qcc->rfctl.msd_uni_l = rparams->initial_max_stream_data_uni; @@ -2580,10 +2679,12 @@ static int qmux_init(struct connection *conn, struct proxy *prx, qcc->wait_event.tasklet = tasklet_new(); if (!qcc->wait_event.tasklet) { TRACE_ERROR("taslket alloc failure", QMUX_EV_QCC_NEW); - goto fail_no_tasklet; + goto err; } LIST_INIT(&qcc->send_list); + LIST_INIT(&qcc->fctl_list); + LIST_INIT(&qcc->buf_wait_list); qcc->wait_event.tasklet->process = qcc_io_cb; qcc->wait_event.tasklet->context = qcc; @@ -2591,7 +2692,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx, qcc->proxy = prx; /* haproxy timeouts */ - if (conn_is_back(qcc->conn)) { + if (conn_is_back(conn)) { qcc->timeout = prx->timeout.server; qcc->shut_timeout = tick_isset(prx->timeout.serverfin) ? prx->timeout.serverfin : prx->timeout.server; @@ -2608,7 +2709,7 @@ static int qmux_init(struct connection *conn, struct proxy *prx, qcc->task = task_new_here(); if (!qcc->task) { TRACE_ERROR("timeout task alloc failure", QMUX_EV_QCC_NEW); - goto fail_no_timeout_task; + goto err; } qcc->task->process = qcc_timeout_task; qcc->task->context = qcc; @@ -2619,11 +2720,12 @@ static int qmux_init(struct connection *conn, struct proxy *prx, HA_ATOMIC_STORE(&conn->handle.qc->qcc, qcc); + /* Register conn as app_ops may use it. */ + qcc->conn = conn; + if (qcc_install_app_ops(qcc, conn->handle.qc->app_ops)) { - TRACE_PROTO("Cannot install app layer", QMUX_EV_QCC_NEW|QMUX_EV_QCC_ERR, qcc->conn); - /* prepare a CONNECTION_CLOSE frame */ - quic_set_connection_close(conn->handle.qc, quic_err_transport(QC_ERR_APPLICATION_ERROR)); - goto fail_install_app_ops; + TRACE_PROTO("Cannot install app layer", QMUX_EV_QCC_NEW|QMUX_EV_QCC_ERR, conn); + goto err; } if (qcc->app_ops == &h3_ops) @@ -2636,19 +2738,24 @@ static int qmux_init(struct connection *conn, struct proxy *prx, /* init read cycle */ tasklet_wakeup(qcc->wait_event.tasklet); - TRACE_LEAVE(QMUX_EV_QCC_NEW, qcc->conn); + TRACE_LEAVE(QMUX_EV_QCC_NEW, conn); return 0; - fail_install_app_ops: - if (qcc->app_ops && qcc->app_ops->release) - qcc->app_ops->release(qcc->ctx); - task_destroy(qcc->task); - fail_no_timeout_task: - tasklet_free(qcc->wait_event.tasklet); - fail_no_tasklet: - pool_free(pool_head_qcc, qcc); - fail_no_qcc: - TRACE_LEAVE(QMUX_EV_QCC_NEW); + err: + /* Prepare CONNECTION_CLOSE, using INTERNAL_ERROR as fallback code if unset. */ + if (!(conn->handle.qc->flags & QUIC_FL_CONN_IMMEDIATE_CLOSE)) { + struct quic_err err = qcc && qcc->err.code ? + qcc->err : quic_err_transport(QC_ERR_INTERNAL_ERROR); + quic_set_connection_close(conn->handle.qc, err); + } + + if (qcc) { + /* In case of MUX init failure, session will ensure connection is freed. */ + qcc->conn = NULL; + qcc_release(qcc); + } + + TRACE_DEVEL("leaving on error", QMUX_EV_QCC_NEW, conn); return -1; } @@ -2704,6 +2811,7 @@ static void qmux_strm_detach(struct sedesc *sd) return; release: + qcc_shutdown(qcc); qcc_release(qcc); TRACE_LEAVE(QMUX_EV_STRM_END); return; @@ -2786,11 +2894,18 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, int flags) { struct qcs *qcs = __sc_mux_strm(sc); + const size_t old_data = qcs_prep_bytes(qcs); size_t ret = 0; char fin; TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + /* Stream must not be woken up if already waiting for conn buffer. */ + BUG_ON(LIST_INLIST(&qcs->el_buf)); + + /* Sending forbidden if QCS is locally closed (FIN or RESET_STREAM sent). */ + BUG_ON(qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET)); + /* stream layer has been detached so no transfer must occur after. */ BUG_ON_HOT(qcs->flags & QC_SF_DETACH); @@ -2801,8 +2916,20 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf, goto end; } - if (qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET)) { - ret = qcs_http_reset_buf(qcs, buf, count); + if (qfctl_sblocked(&qcs->qcc->tx.fc)) { + TRACE_DEVEL("leaving on connection flow control", + QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + if (!LIST_INLIST(&qcs->el_fctl)) { + TRACE_DEVEL("append to fctl-list", + QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + LIST_APPEND(&qcs->qcc->fctl_list, &qcs->el_fctl); + } + goto end; + } + + if (qfctl_sblocked(&qcs->tx.fc)) { + TRACE_DEVEL("leaving on flow-control reached", + QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); goto end; } @@ -2813,7 +2940,9 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf, } if (ret || fin) { - qcc_send_stream(qcs, 0); + const size_t data = qcs_prep_bytes(qcs) - old_data; + if (data || fin) + qcc_send_stream(qcs, 0, data); if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND)) tasklet_wakeup(qcs->qcc->wait_event.tasklet); } @@ -2825,18 +2954,25 @@ static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf, } -static size_t qmux_nego_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice) +static size_t qmux_strm_nego_ff(struct stconn *sc, struct buffer *input, + size_t count, unsigned int flags) { struct qcs *qcs = __sc_mux_strm(sc); size_t ret = 0; TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + /* Stream must not be woken up if already waiting for conn buffer. */ + BUG_ON(LIST_INLIST(&qcs->el_buf)); + + /* Sending forbidden if QCS is locally closed (FIN or RESET_STREAM sent). */ + BUG_ON(qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET)); + /* stream layer has been detached so no transfer must occur after. */ BUG_ON_HOT(qcs->flags & QC_SF_DETACH); if (!qcs->qcc->app_ops->nego_ff || !qcs->qcc->app_ops->done_ff) { - /* Fast forwading is not supported by the QUIC application layer */ + /* Fast forwarding is not supported by the QUIC application layer */ qcs->sd->iobuf.flags |= IOBUF_FL_NO_FF; goto end; } @@ -2850,6 +2986,22 @@ static size_t qmux_nego_ff(struct stconn *sc, struct buffer *input, size_t count goto end; } + if (qfctl_sblocked(&qcs->qcc->tx.fc)) { + TRACE_DEVEL("leaving on connection flow control", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + if (!LIST_INLIST(&qcs->el_fctl)) { + TRACE_DEVEL("append to fctl-list", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + LIST_APPEND(&qcs->qcc->fctl_list, &qcs->el_fctl); + } + qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; + goto end; + } + + if (qfctl_sblocked(&qcs->tx.fc)) { + TRACE_DEVEL("leaving on flow-control reached", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); + qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; + goto end; + } + /* Alawys disable splicing */ qcs->sd->iobuf.flags |= IOBUF_FL_NO_SPLICING; @@ -2880,36 +3032,37 @@ static size_t qmux_nego_ff(struct stconn *sc, struct buffer *input, size_t count return ret; } -static size_t qmux_done_ff(struct stconn *sc) +static size_t qmux_strm_done_ff(struct stconn *sc) { struct qcs *qcs = __sc_mux_strm(sc); struct qcc *qcc = qcs->qcc; struct sedesc *sd = qcs->sd; - size_t total = 0; + size_t total = 0, data = sd->iobuf.data; TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); - if (sd->iobuf.flags & IOBUF_FL_EOI) + if (sd->iobuf.flags & IOBUF_FL_EOI) { + TRACE_STATE("reached stream fin", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); qcs->flags |= QC_SF_FIN_STREAM; + } if (!(qcs->flags & QC_SF_FIN_STREAM) && !sd->iobuf.data) goto end; + data += sd->iobuf.offset; total = qcs->qcc->app_ops->done_ff(qcs); - qcc_send_stream(qcs, 0); + if (data || qcs->flags & QC_SF_FIN_STREAM) + qcc_send_stream(qcs, 0, data); if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND)) tasklet_wakeup(qcc->wait_event.tasklet); end: - if (!b_data(&qcs->tx.buf)) - b_free(&qcs->tx.buf); - TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); return total; } -static int qmux_resume_ff(struct stconn *sc, unsigned int flags) +static int qmux_strm_resume_ff(struct stconn *sc, unsigned int flags) { return 0; } @@ -2962,16 +3115,20 @@ static int qmux_wake(struct connection *conn) return 0; release: + qcc_shutdown(qcc); qcc_release(qcc); TRACE_LEAVE(QMUX_EV_QCC_WAKE); return 1; } -static void qmux_strm_shutw(struct stconn *sc, enum co_shw_mode mode) +static void qmux_strm_shut(struct stconn *sc, enum se_shut_mode mode, struct se_abort_info *reason) { struct qcs *qcs = __sc_mux_strm(sc); struct qcc *qcc = qcs->qcc; + if (!(mode & (SE_SHW_SILENT|SE_SHW_NORMAL))) + return; + TRACE_ENTER(QMUX_EV_STRM_SHUT, qcc->conn, qcs); /* Early closure reported if QC_SF_FIN_STREAM not yet set. */ @@ -2984,7 +3141,7 @@ static void qmux_strm_shutw(struct stconn *sc, enum co_shw_mode mode) TRACE_STATE("set FIN STREAM", QMUX_EV_STRM_SHUT, qcc->conn, qcs); qcs->flags |= QC_SF_FIN_STREAM; - qcc_send_stream(qcs, 0); + qcc_send_stream(qcs, 0, 0); } } else { @@ -2999,6 +3156,34 @@ static void qmux_strm_shutw(struct stconn *sc, enum co_shw_mode mode) TRACE_LEAVE(QMUX_EV_STRM_SHUT, qcc->conn, qcs); } +static int qmux_ctl(struct connection *conn, enum mux_ctl_type mux_ctl, void *output) +{ + struct qcc *qcc = conn->ctx; + + switch (mux_ctl) { + case MUX_CTL_EXIT_STATUS: + return MUX_ES_UNKNOWN; + + case MUX_CTL_GET_GLITCHES: + return qcc->glitches; + + case MUX_CTL_GET_NBSTRM: { + struct qcs *qcs; + unsigned int nb_strm = qcc->nb_sc; + + list_for_each_entry(qcs, &qcc->opening_list, el_opening) + nb_strm++; + return nb_strm; + } + + case MUX_CTL_GET_MAXSTRM: + return qcc->lfctl.ms_bidi_init; + + default: + return -1; + } +} + static int qmux_sctl(struct stconn *sc, enum mux_sctl_type mux_sctl, void *output) { int ret = 0; @@ -3048,19 +3233,41 @@ static const struct mux_ops qmux_ops = { .detach = qmux_strm_detach, .rcv_buf = qmux_strm_rcv_buf, .snd_buf = qmux_strm_snd_buf, - .nego_fastfwd = qmux_nego_ff, - .done_fastfwd = qmux_done_ff, - .resume_fastfwd = qmux_resume_ff, + .nego_fastfwd = qmux_strm_nego_ff, + .done_fastfwd = qmux_strm_done_ff, + .resume_fastfwd = qmux_strm_resume_ff, .subscribe = qmux_strm_subscribe, .unsubscribe = qmux_strm_unsubscribe, .wake = qmux_wake, - .shutw = qmux_strm_shutw, + .shut = qmux_strm_shut, + .ctl = qmux_ctl, .sctl = qmux_sctl, .show_sd = qmux_strm_show_sd, .flags = MX_FL_HTX|MX_FL_NO_UPG|MX_FL_FRAMED, .name = "QUIC", }; +void qcc_show_quic(struct qcc *qcc) +{ + struct eb64_node *node; + chunk_appendf(&trash, " qcc=0x%p flags=0x%x sc=%llu hreq=%llu\n", + qcc, qcc->flags, (ullong)qcc->nb_sc, (ullong)qcc->nb_hreq); + + node = eb64_first(&qcc->streams_by_id); + while (node) { + struct qcs *qcs = eb64_entry(node, struct qcs, by_id); + chunk_appendf(&trash, " qcs=0x%p id=%llu flags=0x%x st=%s", + qcs, (ullong)qcs->id, qcs->flags, + qcs_st_to_str(qcs->st)); + if (!quic_stream_is_uni(qcs->id) || !quic_stream_is_local(qcc, qcs->id)) + chunk_appendf(&trash, " rxoff=%llu", (ullong)qcs->rx.offset); + if (!quic_stream_is_uni(qcs->id) || !quic_stream_is_remote(qcc, qcs->id)) + chunk_appendf(&trash, " txoff=%llu", (ullong)qcs->tx.fc.off_real); + chunk_appendf(&trash, "\n"); + node = eb64_next(node); + } +} + static struct mux_proto_list mux_proto_quic = { .token = IST("quic"), .mode = PROTO_MODE_HTTP, .side = PROTO_SIDE_FE, .mux = &qmux_ops }; |