summaryrefslogtreecommitdiffstats
path: root/src/quic_sock.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:18:05 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:18:05 +0000
commitb46aad6df449445a9fc4aa7b32bd40005438e3f7 (patch)
tree751aa858ca01f35de800164516b298887382919d /src/quic_sock.c
parentInitial commit. (diff)
downloadhaproxy-b46aad6df449445a9fc4aa7b32bd40005438e3f7.tar.xz
haproxy-b46aad6df449445a9fc4aa7b32bd40005438e3f7.zip
Adding upstream version 2.9.5.upstream/2.9.5
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/quic_sock.c1080
1 files changed, 1080 insertions, 0 deletions
diff --git a/src/quic_sock.c b/src/quic_sock.c
new file mode 100644
index 0000000..c479249
--- /dev/null
+++ b/src/quic_sock.c
@@ -0,0 +1,1080 @@
+/*
+ * QUIC socket management.
+ *
+ * Copyright 2020 HAProxy Technologies, Frederic Lecaille <flecaille@haproxy.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version
+ * 2 of the License, or (at your option) any later version.
+ *
+ */
+
+#define _GNU_SOURCE /* required for struct in6_pktinfo */
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+
+#include <haproxy/api.h>
+#include <haproxy/buf.h>
+#include <haproxy/connection.h>
+#include <haproxy/dynbuf.h>
+#include <haproxy/fd.h>
+#include <haproxy/global-t.h>
+#include <haproxy/list.h>
+#include <haproxy/listener.h>
+#include <haproxy/log.h>
+#include <haproxy/pool.h>
+#include <haproxy/proto_quic.h>
+#include <haproxy/proxy-t.h>
+#include <haproxy/quic_cid.h>
+#include <haproxy/quic_conn.h>
+#include <haproxy/quic_rx.h>
+#include <haproxy/quic_sock.h>
+#include <haproxy/quic_tp-t.h>
+#include <haproxy/quic_trace.h>
+#include <haproxy/session.h>
+#include <haproxy/stats-t.h>
+#include <haproxy/task.h>
+#include <haproxy/trace.h>
+#include <haproxy/tools.h>
+#include <haproxy/trace.h>
+
+/* Log only first EACCES bind() error runtime occurrence. */
+static volatile char quic_bind_eacces_warn = 0;
+
+/* Retrieve a connection's source address. Returns -1 on failure. */
+int quic_sock_get_src(struct connection *conn, struct sockaddr *addr, socklen_t len)
+{
+ struct quic_conn *qc;
+
+ if (!conn || !conn->handle.qc)
+ return -1;
+
+ qc = conn->handle.qc;
+ if (conn_is_back(conn)) {
+ /* no source address defined for outgoing connections for now */
+ return -1;
+ } else {
+ /* front connection, return the peer's address */
+ if (len > sizeof(qc->peer_addr))
+ len = sizeof(qc->peer_addr);
+ memcpy(addr, &qc->peer_addr, len);
+ return 0;
+ }
+}
+
+/* Retrieve a connection's destination address. Returns -1 on failure. */
+int quic_sock_get_dst(struct connection *conn, struct sockaddr *addr, socklen_t len)
+{
+ struct quic_conn *qc;
+
+ if (!conn || !conn->handle.qc)
+ return -1;
+
+ qc = conn->handle.qc;
+ if (conn_is_back(conn)) {
+ /* back connection, return the peer's address */
+ if (len > sizeof(qc->peer_addr))
+ len = sizeof(qc->peer_addr);
+ memcpy(addr, &qc->peer_addr, len);
+ } else {
+ struct sockaddr_storage *from;
+
+ /* Return listener address if IP_PKTINFO or friends are not
+ * supported by the socket.
+ */
+ BUG_ON(!qc->li);
+ from = is_addr(&qc->local_addr) ? &qc->local_addr :
+ &qc->li->rx.addr;
+ if (len > sizeof(*from))
+ len = sizeof(*from);
+ memcpy(addr, from, len);
+ }
+ return 0;
+}
+
+/*
+ * Inspired from session_accept_fd().
+ * Instantiate a new connection (connection struct) to be attached to <qc>
+ * QUIC connection of <l> listener.
+ * Returns 1 if succeeded, 0 if not.
+ */
+static int new_quic_cli_conn(struct quic_conn *qc, struct listener *l,
+ struct sockaddr_storage *saddr)
+{
+ struct connection *cli_conn;
+
+ if (unlikely((cli_conn = conn_new(&l->obj_type)) == NULL))
+ goto out;
+
+ if (!sockaddr_alloc(&cli_conn->src, saddr, sizeof *saddr))
+ goto out_free_conn;
+
+ cli_conn->flags |= CO_FL_FDLESS;
+ qc->conn = cli_conn;
+ cli_conn->handle.qc = qc;
+
+ cli_conn->target = &l->obj_type;
+
+ return 1;
+
+ out_free_conn:
+ qc->conn = NULL;
+ conn_stop_tracking(cli_conn);
+ conn_xprt_close(cli_conn);
+ conn_free(cli_conn);
+ out:
+
+ return 0;
+}
+
+/* Tests if the receiver supports accepting connections. Returns positive on
+ * success, 0 if not possible
+ */
+int quic_sock_accepting_conn(const struct receiver *rx)
+{
+ return 1;
+}
+
+/* Accept an incoming connection from listener <l>, and return it, as well as
+ * a CO_AC_* status code into <status> if not null. Null is returned on error.
+ * <l> must be a valid listener with a valid frontend.
+ */
+struct connection *quic_sock_accept_conn(struct listener *l, int *status)
+{
+ struct quic_conn *qc;
+ struct li_per_thread *lthr = &l->per_thr[ti->ltid];
+
+ qc = MT_LIST_POP(&lthr->quic_accept.conns, struct quic_conn *, accept_list);
+ if (!qc || qc->flags & (QUIC_FL_CONN_CLOSING|QUIC_FL_CONN_DRAINING))
+ goto done;
+
+ if (!new_quic_cli_conn(qc, l, &qc->peer_addr))
+ goto err;
+
+ done:
+ *status = CO_AC_DONE;
+
+ if (qc) {
+ BUG_ON(l->rx.quic_curr_accept <= 0);
+ HA_ATOMIC_DEC(&l->rx.quic_curr_accept);
+ return qc->conn;
+ }
+ else {
+ return NULL;
+ }
+
+ err:
+ /* in case of error reinsert the element to process it later. */
+ MT_LIST_INSERT(&lthr->quic_accept.conns, &qc->accept_list);
+
+ *status = CO_AC_PAUSE;
+ return NULL;
+}
+
+/* QUIC datagrams handler task. */
+struct task *quic_lstnr_dghdlr(struct task *t, void *ctx, unsigned int state)
+{
+ struct quic_dghdlr *dghdlr = ctx;
+ struct quic_dgram *dgram;
+ int max_dgrams = global.tune.maxpollevents;
+
+ TRACE_ENTER(QUIC_EV_CONN_LPKT);
+
+ while ((dgram = MT_LIST_POP(&dghdlr->dgrams, typeof(dgram), handler_list))) {
+ if (quic_dgram_parse(dgram, NULL, dgram->owner)) {
+ /* TODO should we requeue the datagram ? */
+ break;
+ }
+
+ if (--max_dgrams <= 0)
+ goto stop_here;
+ }
+
+ TRACE_LEAVE(QUIC_EV_CONN_LPKT);
+ return t;
+
+ stop_here:
+ /* too much work done at once, come back here later */
+ if (!MT_LIST_ISEMPTY(&dghdlr->dgrams))
+ tasklet_wakeup((struct tasklet *)t);
+
+ TRACE_LEAVE(QUIC_EV_CONN_LPKT);
+ return t;
+}
+
+/* Retrieve the DCID from a QUIC datagram or packet at <pos> position,
+ * <end> being at one byte past the end of this datagram.
+ * Returns 1 if succeeded, 0 if not.
+ */
+static int quic_get_dgram_dcid(unsigned char *pos, const unsigned char *end,
+ unsigned char **dcid, size_t *dcid_len)
+{
+ int ret = 0, long_header;
+ size_t minlen, skip;
+
+ TRACE_ENTER(QUIC_EV_CONN_RXPKT);
+
+ if (!(*pos & QUIC_PACKET_FIXED_BIT)) {
+ TRACE_PROTO("fixed bit not set", QUIC_EV_CONN_RXPKT);
+ goto err;
+ }
+
+ long_header = *pos & QUIC_PACKET_LONG_HEADER_BIT;
+ minlen = long_header ? QUIC_LONG_PACKET_MINLEN :
+ QUIC_SHORT_PACKET_MINLEN + QUIC_HAP_CID_LEN + QUIC_TLS_TAG_LEN;
+ skip = long_header ? QUIC_LONG_PACKET_DCID_OFF : QUIC_SHORT_PACKET_DCID_OFF;
+ if (end - pos < minlen)
+ goto err;
+
+ pos += skip;
+ *dcid_len = long_header ? *pos++ : QUIC_HAP_CID_LEN;
+ if (*dcid_len > QUIC_CID_MAXLEN || end - pos <= *dcid_len)
+ goto err;
+
+ *dcid = pos;
+
+ ret = 1;
+ leave:
+ TRACE_LEAVE(QUIC_EV_CONN_RXPKT);
+ return ret;
+
+ err:
+ TRACE_PROTO("wrong datagram", QUIC_EV_CONN_RXPKT);
+ goto leave;
+}
+
+
+/* Retrieve the DCID from the datagram found at <pos> position and deliver it to the
+ * correct datagram handler.
+ * Return 1 if a correct datagram could be found, 0 if not.
+ */
+static int quic_lstnr_dgram_dispatch(unsigned char *pos, size_t len, void *owner,
+ struct sockaddr_storage *saddr,
+ struct sockaddr_storage *daddr,
+ struct quic_dgram *new_dgram, struct list *dgrams)
+{
+ struct quic_dgram *dgram;
+ unsigned char *dcid;
+ size_t dcid_len;
+ int cid_tid;
+
+ if (!len || !quic_get_dgram_dcid(pos, pos + len, &dcid, &dcid_len))
+ goto err;
+
+ dgram = new_dgram ? new_dgram : pool_alloc(pool_head_quic_dgram);
+ if (!dgram)
+ goto err;
+
+ if ((cid_tid = quic_get_cid_tid(dcid, dcid_len, saddr, pos, len)) < 0) {
+ /* Use the current thread if CID not found. If a clients opens
+ * a connection with multiple packets, it is possible that
+ * several threads will deal with datagrams sharing the same
+ * CID. For this reason, the CID tree insertion will be
+ * conducted as an atomic operation and the datagram ultimately
+ * redispatch by the late thread.
+ */
+ cid_tid = tid;
+ }
+
+ /* All the members must be initialized! */
+ dgram->owner = owner;
+ dgram->buf = pos;
+ dgram->len = len;
+ dgram->dcid = dcid;
+ dgram->dcid_len = dcid_len;
+ dgram->saddr = *saddr;
+ dgram->daddr = *daddr;
+ dgram->qc = NULL;
+
+ /* Attached datagram to its quic_receiver_buf and quic_dghdlrs. */
+ LIST_APPEND(dgrams, &dgram->recv_list);
+ MT_LIST_APPEND(&quic_dghdlrs[cid_tid].dgrams, &dgram->handler_list);
+
+ /* typically quic_lstnr_dghdlr() */
+ tasklet_wakeup(quic_dghdlrs[cid_tid].task);
+
+ return 1;
+
+ err:
+ pool_free(pool_head_quic_dgram, new_dgram);
+ return 0;
+}
+
+/* This function is responsible to remove unused datagram attached in front of
+ * <buf>. Each instances will be freed until a not yet consumed datagram is
+ * found or end of the list is hit. The last unused datagram found is not freed
+ * and is instead returned so that the caller can reuse it if needed.
+ *
+ * Returns the last unused datagram or NULL if no occurrence found.
+ */
+static struct quic_dgram *quic_rxbuf_purge_dgrams(struct quic_receiver_buf *rbuf)
+{
+ struct quic_dgram *cur, *prev = NULL;
+
+ while (!LIST_ISEMPTY(&rbuf->dgram_list)) {
+ cur = LIST_ELEM(rbuf->dgram_list.n, struct quic_dgram *, recv_list);
+
+ /* Loop until a not yet consumed datagram is found. */
+ if (HA_ATOMIC_LOAD(&cur->buf))
+ break;
+
+ /* Clear buffer of current unused datagram. */
+ LIST_DELETE(&cur->recv_list);
+ b_del(&rbuf->buf, cur->len);
+
+ /* Free last found unused datagram. */
+ pool_free(pool_head_quic_dgram, prev);
+ prev = cur;
+ }
+
+ /* Return last unused datagram found. */
+ return prev;
+}
+
+/* Receive data from datagram socket <fd>. Data are placed in <out> buffer of
+ * length <len>.
+ *
+ * Datagram addresses will be returned via the next arguments. <from> will be
+ * the peer address and <to> the reception one. Note that <to> can only be
+ * retrieved if the socket supports IP_PKTINFO or affiliated options. If not,
+ * <to> will be set as AF_UNSPEC. The caller must specify <to_port> to ensure
+ * that <to> address is completely filled.
+ *
+ * Returns value from recvmsg syscall.
+ */
+static ssize_t quic_recv(int fd, void *out, size_t len,
+ struct sockaddr *from, socklen_t from_len,
+ struct sockaddr *to, socklen_t to_len,
+ uint16_t dst_port)
+{
+ union pktinfo {
+#ifdef IP_PKTINFO
+ struct in_pktinfo in;
+#else /* !IP_PKTINFO */
+ struct in_addr addr;
+#endif
+#ifdef IPV6_RECVPKTINFO
+ struct in6_pktinfo in6;
+#endif
+ };
+ char cdata[CMSG_SPACE(sizeof(union pktinfo))];
+ struct msghdr msg;
+ struct iovec vec;
+ struct cmsghdr *cmsg;
+ ssize_t ret;
+
+ vec.iov_base = out;
+ vec.iov_len = len;
+
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_name = from;
+ msg.msg_namelen = from_len;
+ msg.msg_iov = &vec;
+ msg.msg_iovlen = 1;
+ msg.msg_control = &cdata;
+ msg.msg_controllen = sizeof(cdata);
+
+ clear_addr((struct sockaddr_storage *)to);
+
+ do {
+ ret = recvmsg(fd, &msg, 0);
+ } while (ret < 0 && errno == EINTR);
+
+ /* TODO handle errno. On EAGAIN/EWOULDBLOCK use fd_cant_recv() if
+ * using dedicated connection socket.
+ */
+
+ if (ret < 0)
+ goto end;
+
+ for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+ switch (cmsg->cmsg_level) {
+ case IPPROTO_IP:
+#if defined(IP_PKTINFO)
+ if (cmsg->cmsg_type == IP_PKTINFO) {
+ struct sockaddr_in *in = (struct sockaddr_in *)to;
+ struct in_pktinfo *info = (struct in_pktinfo *)CMSG_DATA(cmsg);
+
+ if (to_len >= sizeof(struct sockaddr_in)) {
+ in->sin_family = AF_INET;
+ in->sin_addr = info->ipi_addr;
+ in->sin_port = dst_port;
+ }
+ }
+#elif defined(IP_RECVDSTADDR)
+ if (cmsg->cmsg_type == IP_RECVDSTADDR) {
+ struct sockaddr_in *in = (struct sockaddr_in *)to;
+ struct in_addr *info = (struct in_addr *)CMSG_DATA(cmsg);
+
+ if (to_len >= sizeof(struct sockaddr_in)) {
+ in->sin_family = AF_INET;
+ in->sin_addr.s_addr = info->s_addr;
+ in->sin_port = dst_port;
+ }
+ }
+#endif /* IP_PKTINFO || IP_RECVDSTADDR */
+ break;
+
+ case IPPROTO_IPV6:
+#ifdef IPV6_RECVPKTINFO
+ if (cmsg->cmsg_type == IPV6_PKTINFO) {
+ struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)to;
+ struct in6_pktinfo *info6 = (struct in6_pktinfo *)CMSG_DATA(cmsg);
+
+ if (to_len >= sizeof(struct sockaddr_in6)) {
+ in6->sin6_family = AF_INET6;
+ memcpy(&in6->sin6_addr, &info6->ipi6_addr, sizeof(in6->sin6_addr));
+ in6->sin6_port = dst_port;
+ }
+ }
+#endif
+ break;
+ }
+ }
+
+ end:
+ return ret;
+}
+
+/* Function called on a read event from a listening socket. It tries
+ * to handle as many connections as possible.
+ */
+void quic_lstnr_sock_fd_iocb(int fd)
+{
+ ssize_t ret;
+ struct quic_receiver_buf *rxbuf;
+ struct buffer *buf;
+ struct listener *l = objt_listener(fdtab[fd].owner);
+ struct quic_transport_params *params;
+ /* Source address */
+ struct sockaddr_storage saddr = {0}, daddr = {0};
+ size_t max_sz, cspace;
+ struct quic_dgram *new_dgram;
+ unsigned char *dgram_buf;
+ int max_dgrams;
+
+ BUG_ON(!l);
+
+ new_dgram = NULL;
+ if (!l)
+ return;
+
+ if (!(fdtab[fd].state & FD_POLL_IN) || !fd_recv_ready(fd))
+ return;
+
+ rxbuf = MT_LIST_POP(&l->rx.rxbuf_list, typeof(rxbuf), rxbuf_el);
+ if (!rxbuf)
+ goto out;
+
+ buf = &rxbuf->buf;
+
+ max_dgrams = global.tune.maxpollevents;
+ start:
+ /* Try to reuse an existing dgram. Note that there is always at
+ * least one datagram to pick, except the first time we enter
+ * this function for this <rxbuf> buffer.
+ */
+ new_dgram = quic_rxbuf_purge_dgrams(rxbuf);
+
+ params = &l->bind_conf->quic_params;
+ max_sz = params->max_udp_payload_size;
+ cspace = b_contig_space(buf);
+ if (cspace < max_sz) {
+ struct proxy *px = l->bind_conf->frontend;
+ struct quic_counters *prx_counters = EXTRA_COUNTERS_GET(px->extra_counters_fe, &quic_stats_module);
+ struct quic_dgram *dgram;
+
+ /* Do no mark <buf> as full, and do not try to consume it
+ * if the contiguous remaining space is not at the end
+ */
+ if (b_tail(buf) + cspace < b_wrap(buf)) {
+ HA_ATOMIC_INC(&prx_counters->rxbuf_full);
+ goto out;
+ }
+
+ /* Allocate a fake datagram, without data to locate
+ * the end of the RX buffer (required during purging).
+ */
+ dgram = pool_alloc(pool_head_quic_dgram);
+ if (!dgram)
+ goto out;
+
+ /* Initialize only the useful members of this fake datagram. */
+ dgram->buf = NULL;
+ dgram->len = cspace;
+ /* Append this datagram only to the RX buffer list. It will
+ * not be treated by any datagram handler.
+ */
+ LIST_APPEND(&rxbuf->dgram_list, &dgram->recv_list);
+
+ /* Consume the remaining space */
+ b_add(buf, cspace);
+ if (b_contig_space(buf) < max_sz) {
+ HA_ATOMIC_INC(&prx_counters->rxbuf_full);
+ goto out;
+ }
+ }
+
+ dgram_buf = (unsigned char *)b_tail(buf);
+ ret = quic_recv(fd, dgram_buf, max_sz,
+ (struct sockaddr *)&saddr, sizeof(saddr),
+ (struct sockaddr *)&daddr, sizeof(daddr),
+ get_net_port(&l->rx.addr));
+ if (ret <= 0)
+ goto out;
+
+ b_add(buf, ret);
+ if (!quic_lstnr_dgram_dispatch(dgram_buf, ret, l, &saddr, &daddr,
+ new_dgram, &rxbuf->dgram_list)) {
+ /* If wrong, consume this datagram */
+ b_sub(buf, ret);
+ }
+ new_dgram = NULL;
+ if (--max_dgrams > 0)
+ goto start;
+ out:
+ pool_free(pool_head_quic_dgram, new_dgram);
+ MT_LIST_APPEND(&l->rx.rxbuf_list, &rxbuf->rxbuf_el);
+}
+
+/* FD-owned quic-conn socket callback. */
+void quic_conn_sock_fd_iocb(int fd)
+{
+ struct quic_conn *qc = fdtab[fd].owner;
+
+ TRACE_ENTER(QUIC_EV_CONN_RCV, qc);
+
+ if (fd_send_active(fd) && fd_send_ready(fd)) {
+ TRACE_DEVEL("send ready", QUIC_EV_CONN_RCV, qc);
+ fd_stop_send(fd);
+ tasklet_wakeup_after(NULL, qc->wait_event.tasklet);
+ qc_notify_send(qc);
+ }
+
+ if (fd_recv_ready(fd)) {
+ TRACE_DEVEL("recv ready", QUIC_EV_CONN_RCV, qc);
+ tasklet_wakeup_after(NULL, qc->wait_event.tasklet);
+ fd_stop_recv(fd);
+ }
+
+ TRACE_LEAVE(QUIC_EV_CONN_RCV, qc);
+}
+
+/* Send a datagram stored into <buf> buffer with <sz> as size.
+ * The caller must ensure there is at least <sz> bytes in this buffer.
+ *
+ * Returns the total bytes sent over the socket. 0 is returned if a transient
+ * error is encountered which allows send to be retry later. A negative value
+ * is used for a fatal error which guarantee that all future send operation for
+ * this connection will fail.
+ *
+ * TODO standardize this function for a generic UDP sendto wrapper. This can be
+ * done by removing the <qc> arg and replace it with address/port.
+ */
+int qc_snd_buf(struct quic_conn *qc, const struct buffer *buf, size_t sz,
+ int flags)
+{
+ ssize_t ret;
+
+ do {
+ if (qc_test_fd(qc)) {
+ if (!fd_send_ready(qc->fd))
+ return 0;
+
+ ret = send(qc->fd, b_peek(buf, b_head_ofs(buf)), sz,
+ MSG_DONTWAIT | MSG_NOSIGNAL);
+ }
+#if defined(IP_PKTINFO) || defined(IP_RECVDSTADDR) || defined(IPV6_RECVPKTINFO)
+ else if (is_addr(&qc->local_addr)) {
+ struct msghdr msg = { 0 };
+ struct iovec vec;
+ struct cmsghdr *cmsg;
+#ifdef IP_PKTINFO
+ struct in_pktinfo in;
+#endif /* IP_PKTINFO */
+#ifdef IPV6_RECVPKTINFO
+ struct in6_pktinfo in6;
+#endif /* IPV6_RECVPKTINFO */
+ union {
+#ifdef IP_PKTINFO
+ char buf[CMSG_SPACE(sizeof(in))];
+#endif /* IP_PKTINFO */
+#ifdef IPV6_RECVPKTINFO
+ char buf6[CMSG_SPACE(sizeof(in6))];
+#endif /* IPV6_RECVPKTINFO */
+ char bufaddr[CMSG_SPACE(sizeof(struct in_addr))];
+ struct cmsghdr align;
+ } u;
+
+ vec.iov_base = b_peek(buf, b_head_ofs(buf));
+ vec.iov_len = sz;
+ msg.msg_name = &qc->peer_addr;
+ msg.msg_namelen = get_addr_len(&qc->peer_addr);
+ msg.msg_iov = &vec;
+ msg.msg_iovlen = 1;
+
+ switch (qc->local_addr.ss_family) {
+ case AF_INET:
+#if defined(IP_PKTINFO)
+ memset(&in, 0, sizeof(in));
+ memcpy(&in.ipi_spec_dst,
+ &((struct sockaddr_in *)&qc->local_addr)->sin_addr,
+ sizeof(struct in_addr));
+
+ msg.msg_control = u.buf;
+ msg.msg_controllen = sizeof(u.buf);
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = IPPROTO_IP;
+ cmsg->cmsg_type = IP_PKTINFO;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
+ memcpy(CMSG_DATA(cmsg), &in, sizeof(in));
+#elif defined(IP_RECVDSTADDR)
+ msg.msg_control = u.bufaddr;
+ msg.msg_controllen = sizeof(u.bufaddr);
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = IPPROTO_IP;
+ cmsg->cmsg_type = IP_SENDSRCADDR;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct in_addr));
+ memcpy(CMSG_DATA(cmsg),
+ &((struct sockaddr_in *)&qc->local_addr)->sin_addr,
+ sizeof(struct in_addr));
+#endif /* IP_PKTINFO || IP_RECVDSTADDR */
+ break;
+
+ case AF_INET6:
+#ifdef IPV6_RECVPKTINFO
+ memset(&in6, 0, sizeof(in6));
+ memcpy(&in6.ipi6_addr,
+ &((struct sockaddr_in6 *)&qc->local_addr)->sin6_addr,
+ sizeof(struct in6_addr));
+
+ msg.msg_control = u.buf6;
+ msg.msg_controllen = sizeof(u.buf6);
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = IPPROTO_IPV6;
+ cmsg->cmsg_type = IPV6_PKTINFO;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct in6_pktinfo));
+ memcpy(CMSG_DATA(cmsg), &in6, sizeof(in6));
+#endif /* IPV6_RECVPKTINFO */
+ break;
+
+ default:
+ break;
+ }
+
+ ret = sendmsg(qc->li->rx.fd, &msg,
+ MSG_DONTWAIT|MSG_NOSIGNAL);
+ }
+#endif /* IP_PKTINFO || IP_RECVDSTADDR || IPV6_RECVPKTINFO */
+ else {
+ ret = sendto(qc->li->rx.fd, b_peek(buf, b_head_ofs(buf)), sz,
+ MSG_DONTWAIT|MSG_NOSIGNAL,
+ (struct sockaddr *)&qc->peer_addr,
+ get_addr_len(&qc->peer_addr));
+ }
+ } while (ret < 0 && errno == EINTR);
+
+ if (ret < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK ||
+ errno == ENOTCONN || errno == EINPROGRESS) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ qc->cntrs.socket_full++;
+ else
+ qc->cntrs.sendto_err++;
+
+ /* transient error */
+ fd_want_send(qc->fd);
+ fd_cant_send(qc->fd);
+ TRACE_PRINTF(TRACE_LEVEL_USER, QUIC_EV_CONN_SPPKTS, qc, 0, 0, 0,
+ "UDP send failure errno=%d (%s)", errno, strerror(errno));
+ return 0;
+ }
+ else {
+ /* unrecoverable error */
+ qc->cntrs.sendto_err_unknown++;
+ TRACE_PRINTF(TRACE_LEVEL_USER, QUIC_EV_CONN_SPPKTS, qc, 0, 0, 0,
+ "UDP send failure errno=%d (%s)", errno, strerror(errno));
+ return -1;
+ }
+ }
+
+ if (ret != sz)
+ return 0;
+
+ return ret;
+}
+
+/* Receive datagram on <qc> FD-owned socket.
+ *
+ * Returns the total number of bytes read or a negative value on error.
+ */
+int qc_rcv_buf(struct quic_conn *qc)
+{
+ struct sockaddr_storage saddr = {0}, daddr = {0};
+ struct quic_transport_params *params;
+ struct quic_dgram *new_dgram = NULL;
+ struct buffer buf = BUF_NULL;
+ size_t max_sz;
+ unsigned char *dgram_buf;
+ struct listener *l;
+ ssize_t ret = 0;
+
+ /* Do not call this if quic-conn FD is uninitialized. */
+ BUG_ON(qc->fd < 0);
+
+ TRACE_ENTER(QUIC_EV_CONN_RCV, qc);
+ l = qc->li;
+
+ params = &l->bind_conf->quic_params;
+ max_sz = params->max_udp_payload_size;
+
+ do {
+ if (!b_alloc(&buf))
+ break; /* TODO subscribe for memory again available. */
+
+ b_reset(&buf);
+ BUG_ON(b_contig_space(&buf) < max_sz);
+
+ /* Allocate datagram on first loop or after requeuing. */
+ if (!new_dgram && !(new_dgram = pool_alloc(pool_head_quic_dgram)))
+ break; /* TODO subscribe for memory again available. */
+
+ dgram_buf = (unsigned char *)b_tail(&buf);
+ ret = quic_recv(qc->fd, dgram_buf, max_sz,
+ (struct sockaddr *)&saddr, sizeof(saddr),
+ (struct sockaddr *)&daddr, sizeof(daddr),
+ get_net_port(&qc->local_addr));
+ if (ret <= 0) {
+ /* Subscribe FD for future reception. */
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOTCONN)
+ fd_want_recv(qc->fd);
+ /* TODO handle other error codes as fatal on the connection. */
+ break;
+ }
+
+ b_add(&buf, ret);
+
+ new_dgram->buf = dgram_buf;
+ new_dgram->len = ret;
+ new_dgram->dcid_len = 0;
+ new_dgram->dcid = NULL;
+ new_dgram->saddr = saddr;
+ new_dgram->daddr = daddr;
+ new_dgram->qc = NULL; /* set later via quic_dgram_parse() */
+
+ TRACE_DEVEL("read datagram", QUIC_EV_CONN_RCV, qc, new_dgram);
+
+ if (!quic_get_dgram_dcid(new_dgram->buf,
+ new_dgram->buf + new_dgram->len,
+ &new_dgram->dcid, &new_dgram->dcid_len)) {
+ continue;
+ }
+
+ if (!qc_check_dcid(qc, new_dgram->dcid, new_dgram->dcid_len)) {
+ /* Datagram received by error on the connection FD, dispatch it
+ * to its associated quic-conn.
+ *
+ * TODO count redispatch datagrams.
+ */
+ struct quic_receiver_buf *rxbuf;
+ struct quic_dgram *tmp_dgram;
+ unsigned char *rxbuf_tail;
+ size_t cspace;
+
+ TRACE_STATE("datagram for other connection on quic-conn socket, requeue it", QUIC_EV_CONN_RCV, qc);
+
+ rxbuf = MT_LIST_POP(&l->rx.rxbuf_list, typeof(rxbuf), rxbuf_el);
+ ALREADY_CHECKED(rxbuf);
+ cspace = b_contig_space(&rxbuf->buf);
+
+ tmp_dgram = quic_rxbuf_purge_dgrams(rxbuf);
+ pool_free(pool_head_quic_dgram, tmp_dgram);
+
+ /* Insert a fake datagram if space wraps to consume it. */
+ if (cspace < new_dgram->len && b_space_wraps(&rxbuf->buf)) {
+ struct quic_dgram *fake_dgram = pool_alloc(pool_head_quic_dgram);
+ if (!fake_dgram) {
+ /* TODO count lost datagrams */
+ MT_LIST_APPEND(&l->rx.rxbuf_list, &rxbuf->rxbuf_el);
+ continue;
+ }
+
+ fake_dgram->buf = NULL;
+ fake_dgram->len = cspace;
+ LIST_APPEND(&rxbuf->dgram_list, &fake_dgram->recv_list);
+ b_add(&rxbuf->buf, cspace);
+ }
+
+ /* Recheck contig space after fake datagram insert. */
+ if (b_contig_space(&rxbuf->buf) < new_dgram->len) {
+ /* TODO count lost datagrams */
+ MT_LIST_APPEND(&l->rx.rxbuf_list, &rxbuf->rxbuf_el);
+ continue;
+ }
+
+ rxbuf_tail = (unsigned char *)b_tail(&rxbuf->buf);
+ __b_putblk(&rxbuf->buf, (char *)dgram_buf, new_dgram->len);
+ if (!quic_lstnr_dgram_dispatch(rxbuf_tail, ret, l, &saddr, &daddr,
+ new_dgram, &rxbuf->dgram_list)) {
+ /* TODO count lost datagrams. */
+ b_sub(&buf, ret);
+ }
+ else {
+ /* datagram must not be freed as it was requeued. */
+ new_dgram = NULL;
+ }
+
+ MT_LIST_APPEND(&l->rx.rxbuf_list, &rxbuf->rxbuf_el);
+ continue;
+ }
+
+ quic_dgram_parse(new_dgram, qc, qc->li);
+ /* A datagram must always be consumed after quic_parse_dgram(). */
+ BUG_ON(new_dgram->buf);
+ } while (ret > 0);
+
+ pool_free(pool_head_quic_dgram, new_dgram);
+
+ if (b_size(&buf)) {
+ b_free(&buf);
+ offer_buffers(NULL, 1);
+ }
+
+ TRACE_LEAVE(QUIC_EV_CONN_RCV, qc);
+ return ret;
+}
+
+/* Allocate a socket file-descriptor specific for QUIC connection <qc>.
+ * Endpoint addresses are specified by the two following arguments : <src> is
+ * the local address and <dst> is the remote one.
+ *
+ * Return the socket FD or a negative error code. On error, socket is marked as
+ * uninitialized.
+ */
+void qc_alloc_fd(struct quic_conn *qc, const struct sockaddr_storage *src,
+ const struct sockaddr_storage *dst)
+{
+ struct bind_conf *bc = qc->li->bind_conf;
+ struct proxy *p = bc->frontend;
+ int fd = -1;
+ int ret;
+
+ /* Must not happen. */
+ BUG_ON(src->ss_family != dst->ss_family);
+
+ qc_init_fd(qc);
+
+ fd = socket(src->ss_family, SOCK_DGRAM, 0);
+ if (fd < 0)
+ goto err;
+
+ if (fd >= global.maxsock) {
+ send_log(p, LOG_EMERG,
+ "Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n",
+ p->id);
+ goto err;
+ }
+
+ ret = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
+ if (ret < 0)
+ goto err;
+
+ switch (src->ss_family) {
+ case AF_INET:
+#if defined(IP_PKTINFO)
+ ret = setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &one, sizeof(one));
+#elif defined(IP_RECVDSTADDR)
+ ret = setsockopt(fd, IPPROTO_IP, IP_RECVDSTADDR, &one, sizeof(one));
+#endif /* IP_PKTINFO || IP_RECVDSTADDR */
+ break;
+ case AF_INET6:
+#ifdef IPV6_RECVPKTINFO
+ ret = setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
+#endif
+ break;
+ }
+ if (ret < 0)
+ goto err;
+
+ ret = bind(fd, (struct sockaddr *)src, get_addr_len(src));
+ if (ret < 0) {
+ if (errno == EACCES) {
+ if (!quic_bind_eacces_warn) {
+ send_log(p, LOG_WARNING,
+ "Permission error on QUIC socket binding for proxy %s. Consider using setcap cap_net_bind_service (Linux only) or running as root.\n",
+ p->id);
+ quic_bind_eacces_warn = 1;
+ }
+
+ /* Fallback to listener socket for this receiver instance. */
+ HA_ATOMIC_STORE(&qc->li->rx.quic_mode, QUIC_SOCK_MODE_LSTNR);
+ }
+ goto err;
+ }
+
+ ret = connect(fd, (struct sockaddr *)dst, get_addr_len(dst));
+ if (ret < 0)
+ goto err;
+
+ qc->fd = fd;
+ fd_set_nonblock(fd);
+ fd_insert(fd, qc, quic_conn_sock_fd_iocb, tgid, ti->ltid_bit);
+ fd_want_recv(fd);
+
+ return;
+
+ err:
+ if (fd >= 0)
+ close(fd);
+}
+
+/* Release socket file-descriptor specific for QUIC connection <qc>. Set
+ * <reinit> if socket should be reinitialized after address migration.
+ */
+void qc_release_fd(struct quic_conn *qc, int reinit)
+{
+ if (qc_test_fd(qc)) {
+ fd_delete(qc->fd);
+ qc->fd = DEAD_FD_MAGIC;
+
+ if (reinit)
+ qc_init_fd(qc);
+ }
+}
+
+/* Wrapper for fd_want_recv(). Safe even if connection does not used its owned
+ * socket.
+ */
+void qc_want_recv(struct quic_conn *qc)
+{
+ if (qc_test_fd(qc))
+ fd_want_recv(qc->fd);
+}
+
+/*********************** QUIC accept queue management ***********************/
+/* per-thread accept queues */
+struct quic_accept_queue *quic_accept_queues;
+
+/* Install <qc> on the queue ready to be accepted. The queue task is then woken
+ * up. If <qc> accept is already scheduled or done, nothing is done.
+ */
+void quic_accept_push_qc(struct quic_conn *qc)
+{
+ struct quic_accept_queue *queue = &quic_accept_queues[tid];
+ struct li_per_thread *lthr = &qc->li->per_thr[ti->ltid];
+
+ /* early return if accept is already in progress/done for this
+ * connection
+ */
+ if (qc->flags & QUIC_FL_CONN_ACCEPT_REGISTERED)
+ return;
+
+ BUG_ON(MT_LIST_INLIST(&qc->accept_list));
+ HA_ATOMIC_INC(&qc->li->rx.quic_curr_accept);
+
+ qc->flags |= QUIC_FL_CONN_ACCEPT_REGISTERED;
+ /* 1. insert the listener in the accept queue
+ *
+ * Use TRY_APPEND as there is a possible race even with INLIST if
+ * multiple threads try to add the same listener instance from several
+ * quic_conn.
+ */
+ if (!MT_LIST_INLIST(&(lthr->quic_accept.list)))
+ MT_LIST_TRY_APPEND(&queue->listeners, &(lthr->quic_accept.list));
+
+ /* 2. insert the quic_conn in the listener per-thread queue. */
+ MT_LIST_APPEND(&lthr->quic_accept.conns, &qc->accept_list);
+
+ /* 3. wake up the queue tasklet */
+ tasklet_wakeup(quic_accept_queues[tid].tasklet);
+}
+
+/* Tasklet handler to accept QUIC connections. Call listener_accept on every
+ * listener instances registered in the accept queue.
+ */
+struct task *quic_accept_run(struct task *t, void *ctx, unsigned int i)
+{
+ struct li_per_thread *lthr;
+ struct mt_list *elt1, elt2;
+ struct quic_accept_queue *queue = &quic_accept_queues[tid];
+
+ mt_list_for_each_entry_safe(lthr, &queue->listeners, quic_accept.list, elt1, elt2) {
+ listener_accept(lthr->li);
+ if (!MT_LIST_ISEMPTY(&lthr->quic_accept.conns))
+ tasklet_wakeup((struct tasklet*)t);
+ else
+ MT_LIST_DELETE_SAFE(elt1);
+ }
+
+ return NULL;
+}
+
+/* Returns the maximum number of QUIC connections waiting for handshake to
+ * complete in parallel on listener <l> instance. This is directly based on
+ * listener backlog value.
+ */
+int quic_listener_max_handshake(const struct listener *l)
+{
+ return listener_backlog(l) / 2;
+}
+
+/* Returns the value which is considered as the maximum number of QUIC
+ * connections waiting to be accepted for listener <l> instance. This is
+ * directly based on listener backlog value.
+ */
+int quic_listener_max_accept(const struct listener *l)
+{
+ return listener_backlog(l) / 2;
+}
+
+static int quic_alloc_accept_queues(void)
+{
+ int i;
+
+ quic_accept_queues = calloc(global.nbthread,
+ sizeof(*quic_accept_queues));
+ if (!quic_accept_queues) {
+ ha_alert("Failed to allocate the quic accept queues.\n");
+ return 0;
+ }
+
+ for (i = 0; i < global.nbthread; ++i) {
+ struct tasklet *task;
+ if (!(task = tasklet_new())) {
+ ha_alert("Failed to allocate the quic accept queue on thread %d.\n", i);
+ return 0;
+ }
+
+ tasklet_set_tid(task, i);
+ task->process = quic_accept_run;
+ quic_accept_queues[i].tasklet = task;
+
+ MT_LIST_INIT(&quic_accept_queues[i].listeners);
+ }
+
+ return 1;
+}
+REGISTER_POST_CHECK(quic_alloc_accept_queues);
+
+static int quic_deallocate_accept_queues(void)
+{
+ int i;
+
+ if (quic_accept_queues) {
+ for (i = 0; i < global.nbthread; ++i)
+ tasklet_free(quic_accept_queues[i].tasklet);
+ free(quic_accept_queues);
+ }
+
+ return 1;
+}
+REGISTER_POST_DEINIT(quic_deallocate_accept_queues);