summaryrefslogtreecommitdiffstats
path: root/src/mux_h1.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-03 05:11:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-03 05:11:10 +0000
commitcff6d757e3ba609c08ef2aaa00f07e53551e5bf6 (patch)
tree08c4fc3255483ad397d712edb4214ded49149fd9 /src/mux_h1.c
parentAdding upstream version 2.9.7. (diff)
downloadhaproxy-cff6d757e3ba609c08ef2aaa00f07e53551e5bf6.tar.xz
haproxy-cff6d757e3ba609c08ef2aaa00f07e53551e5bf6.zip
Adding upstream version 3.0.0.upstream/3.0.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/mux_h1.c')
-rw-r--r--src/mux_h1.c770
1 files changed, 526 insertions, 244 deletions
diff --git a/src/mux_h1.c b/src/mux_h1.c
index 6593661..6bdaf71 100644
--- a/src/mux_h1.c
+++ b/src/mux_h1.c
@@ -227,7 +227,7 @@ enum {
};
-static struct name_desc h1_stats[] = {
+static struct stat_col h1_stats[] = {
[H1_ST_OPEN_CONN] = { .name = "h1_open_connections",
.desc = "Count of currently open connections" },
[H1_ST_OPEN_STREAM] = { .name = "h1_open_streams",
@@ -264,21 +264,54 @@ static struct h1_counters {
#endif
} h1_counters;
-static void h1_fill_stats(void *data, struct field *stats)
+static int h1_fill_stats(void *data, struct field *stats, unsigned int *selected_field)
{
struct h1_counters *counters = data;
+ unsigned int current_field = (selected_field != NULL ? *selected_field : 0);
- stats[H1_ST_OPEN_CONN] = mkf_u64(FN_GAUGE, counters->open_conns);
- stats[H1_ST_OPEN_STREAM] = mkf_u64(FN_GAUGE, counters->open_streams);
- stats[H1_ST_TOTAL_CONN] = mkf_u64(FN_COUNTER, counters->total_conns);
- stats[H1_ST_TOTAL_STREAM] = mkf_u64(FN_COUNTER, counters->total_streams);
+ for (; current_field < H1_STATS_COUNT; current_field++) {
+ struct field metric = { 0 };
- stats[H1_ST_BYTES_IN] = mkf_u64(FN_COUNTER, counters->bytes_in);
- stats[H1_ST_BYTES_OUT] = mkf_u64(FN_COUNTER, counters->bytes_out);
+ switch (current_field) {
+ case H1_ST_OPEN_CONN:
+ metric = mkf_u64(FN_GAUGE, counters->open_conns);
+ break;
+ case H1_ST_OPEN_STREAM:
+ metric = mkf_u64(FN_GAUGE, counters->open_streams);
+ break;
+ case H1_ST_TOTAL_CONN:
+ metric = mkf_u64(FN_COUNTER, counters->total_conns);
+ break;
+ case H1_ST_TOTAL_STREAM:
+ metric = mkf_u64(FN_COUNTER, counters->total_streams);
+ break;
+ case H1_ST_BYTES_IN:
+ metric = mkf_u64(FN_COUNTER, counters->bytes_in);
+ break;
+ case H1_ST_BYTES_OUT:
+ metric = mkf_u64(FN_COUNTER, counters->bytes_out);
+ break;
#if defined(USE_LINUX_SPLICE)
- stats[H1_ST_SPLICED_BYTES_IN] = mkf_u64(FN_COUNTER, counters->spliced_bytes_in);
- stats[H1_ST_SPLICED_BYTES_OUT] = mkf_u64(FN_COUNTER, counters->spliced_bytes_out);
+ case H1_ST_SPLICED_BYTES_IN:
+ metric = mkf_u64(FN_COUNTER, counters->spliced_bytes_in);
+ break;
+ case H1_ST_SPLICED_BYTES_OUT:
+ metric = mkf_u64(FN_COUNTER, counters->spliced_bytes_out);
+ break;
#endif
+ default:
+ /* not used for frontends. If a specific metric
+ * is requested, return an error. Otherwise continue.
+ */
+ if (selected_field != NULL)
+ return 0;
+ continue;
+ }
+ stats[current_field] = metric;
+ if (selected_field != NULL)
+ break;
+ }
+ return 1;
}
static struct stats_module h1_stats_module = {
@@ -302,6 +335,8 @@ DECLARE_STATIC_POOL(pool_head_h1s, "h1s", sizeof(struct h1s));
static int h1_recv(struct h1c *h1c);
static int h1_send(struct h1c *h1c);
static int h1_process(struct h1c *h1c);
+static void h1_release(struct h1c *h1c);
+
/* h1_io_cb is exported to see it resolved in "show fd" */
struct task *h1_io_cb(struct task *t, void *ctx, unsigned int state);
struct task *h1_timeout_task(struct task *t, void *context, unsigned int state);
@@ -466,45 +501,91 @@ static int h1_buf_available(void *target)
{
struct h1c *h1c = target;
- if ((h1c->flags & H1C_F_IN_ALLOC) && b_alloc(&h1c->ibuf)) {
- TRACE_STATE("unblocking h1c, ibuf allocated", H1_EV_H1C_RECV|H1_EV_H1C_BLK|H1_EV_H1C_WAKE, h1c->conn);
+ if (h1c->flags & H1C_F_IN_ALLOC) {
h1c->flags &= ~H1C_F_IN_ALLOC;
- if (h1_recv_allowed(h1c))
- tasklet_wakeup(h1c->wait_event.tasklet);
- return 1;
+ h1c->flags |= H1C_F_IN_MAYALLOC;
}
- if ((h1c->flags & H1C_F_OUT_ALLOC) && b_alloc(&h1c->obuf)) {
- TRACE_STATE("unblocking h1s, obuf allocated", H1_EV_TX_DATA|H1_EV_H1S_BLK|H1_EV_STRM_WAKE, h1c->conn, h1c->h1s);
+ if ((h1c->flags & H1C_F_OUT_ALLOC) && h1c->h1s) {
+ TRACE_STATE("unblocking h1s, obuf allocatable", H1_EV_TX_DATA|H1_EV_H1S_BLK|H1_EV_STRM_WAKE, h1c->conn, h1c->h1s);
h1c->flags &= ~H1C_F_OUT_ALLOC;
- if (h1c->h1s)
- h1_wake_stream_for_send(h1c->h1s);
- return 1;
+ h1c->flags |= H1C_F_OUT_MAYALLOC;
+ h1_wake_stream_for_send(h1c->h1s);
}
- if ((h1c->flags & H1C_F_IN_SALLOC) && h1c->h1s && b_alloc(&h1c->h1s->rxbuf)) {
- TRACE_STATE("unblocking h1c, stream rxbuf allocated", H1_EV_H1C_RECV|H1_EV_H1C_BLK|H1_EV_H1C_WAKE, h1c->conn);
+ if ((h1c->flags & H1C_F_IN_SALLOC) && h1c->h1s) {
+ TRACE_STATE("unblocking h1c, stream rxbuf allocatable", H1_EV_H1C_RECV|H1_EV_H1C_BLK|H1_EV_H1C_WAKE, h1c->conn);
h1c->flags &= ~H1C_F_IN_SALLOC;
+ h1c->flags |= H1C_F_IN_SMAYALLOC;
tasklet_wakeup(h1c->wait_event.tasklet);
- return 1;
}
- return 0;
+ if ((h1c->flags & H1C_F_IN_MAYALLOC) && h1_recv_allowed(h1c)) {
+ TRACE_STATE("unblocking h1c, ibuf allocatable", H1_EV_H1C_RECV|H1_EV_H1C_BLK|H1_EV_H1C_WAKE, h1c->conn);
+ tasklet_wakeup(h1c->wait_event.tasklet);
+ }
+
+ return 1;
+}
+
+/*
+ * Allocate the h1c's ibuf. If if fails, it adds the mux in buffer wait queue,
+ * and sets the H1C_F_IN_ALLOC flag on the connection. It will advertise a more
+ * urgent allocation when a stream is already present than when none is present
+ * since in one case a buffer might be needed to permit to release another one,
+ * while in the other case we've simply not started anything.
+ */
+static inline struct buffer *h1_get_ibuf(struct h1c *h1c)
+{
+ struct buffer *buf;
+
+ if (unlikely((buf = b_alloc(&h1c->ibuf, DB_MUX_RX |
+ ((h1c->flags & H1C_F_IN_MAYALLOC) ? DB_F_NOQUEUE : 0))) == NULL)) {
+ b_queue(DB_MUX_RX, &h1c->buf_wait, h1c, h1_buf_available);
+ h1c->flags |= H1C_F_IN_ALLOC;
+ }
+ else
+ h1c->flags &= ~H1C_F_IN_MAYALLOC;
+
+ return buf;
}
/*
- * Allocate a buffer. If if fails, it adds the mux in buffer wait queue.
+ * Allocate the h1c's obuf. If if fails, it adds the mux in buffer wait queue,
+ * and sets the H1C_F_OUT_ALLOC flag on the connection.
*/
-static inline struct buffer *h1_get_buf(struct h1c *h1c, struct buffer *bptr)
+static inline struct buffer *h1_get_obuf(struct h1c *h1c)
{
- struct buffer *buf = NULL;
+ struct buffer *buf;
- if (likely(!LIST_INLIST(&h1c->buf_wait.list)) &&
- unlikely((buf = b_alloc(bptr)) == NULL)) {
- h1c->buf_wait.target = h1c;
- h1c->buf_wait.wakeup_cb = h1_buf_available;
- LIST_APPEND(&th_ctx->buffer_wq, &h1c->buf_wait.list);
+ if (unlikely((buf = b_alloc(&h1c->obuf, DB_MUX_TX |
+ ((h1c->flags & H1C_F_OUT_MAYALLOC) ? DB_F_NOQUEUE : 0))) == NULL)) {
+ b_queue(DB_MUX_TX, &h1c->buf_wait, h1c, h1_buf_available);
+ h1c->flags |= H1C_F_OUT_ALLOC;
}
+ else
+ h1c->flags &= ~H1C_F_OUT_MAYALLOC;
+
+ return buf;
+}
+
+/*
+ * Allocate the h1s's rxbuf. If if fails, it adds the mux in buffer wait queue,
+ * and sets the H1C_F_IN_SALLOC flag on the connection.
+ */
+static inline struct buffer *h1_get_rxbuf(struct h1s *h1s)
+{
+ struct h1c *h1c = h1s->h1c;
+ struct buffer *buf;
+
+ if (unlikely((buf = b_alloc(&h1s->rxbuf, DB_SE_RX |
+ ((h1c->flags & H1C_F_IN_SMAYALLOC) ? DB_F_NOQUEUE : 0))) == NULL)) {
+ b_queue(DB_SE_RX, &h1c->buf_wait, h1c, h1_buf_available);
+ h1c->flags |= H1C_F_IN_SALLOC;
+ }
+ else
+ h1c->flags &= ~H1C_F_IN_SMAYALLOC;
+
return buf;
}
@@ -521,11 +602,11 @@ static inline void h1_release_buf(struct h1c *h1c, struct buffer *bptr)
}
/* Returns 1 if the H1 connection is alive (IDLE, EMBRYONIC, RUNNING or
- * RUNNING). Ortherwise 0 is returned.
+ * DRAINING). Ortherwise 0 is returned.
*/
static inline int h1_is_alive(const struct h1c *h1c)
{
- return (h1c->state <= H1_CS_RUNNING);
+ return (h1c->state <= H1_CS_DRAINING);
}
/* Switch the H1 connection to CLOSING or CLOSED mode, depending on the output
@@ -869,7 +950,8 @@ static void h1s_destroy(struct h1s *h1s)
h1_release_buf(h1c, &h1s->rxbuf);
h1c->flags &= ~(H1C_F_WANT_FASTFWD|
- H1C_F_OUT_FULL|H1C_F_OUT_ALLOC|H1C_F_IN_SALLOC|
+ H1C_F_OUT_FULL|H1C_F_OUT_ALLOC|H1C_F_OUT_MAYALLOC|
+ H1C_F_IN_SALLOC|H1C_F_IN_SMAYALLOC|
H1C_F_CO_MSG_MORE|H1C_F_CO_STREAMER);
if (!(h1c->flags & (H1C_F_EOS|H1C_F_ERR_PENDING|H1C_F_ERROR|H1C_F_ABRT_PENDING|H1C_F_ABRTED)) && /* No error/read0/abort */
@@ -893,6 +975,162 @@ static void h1s_destroy(struct h1s *h1s)
}
}
+
+/* Check if shutdown performed of an an H1S must lead to a connection shutdown
+ * of if it can be kept alive. It returns 1 if the connection must be shut down
+ * and 0 it if can be kept alive.
+ */
+static int h1s_must_shut_conn(struct h1s *h1s)
+{
+ struct h1c *h1c = h1s->h1c;
+ int ret;
+
+ TRACE_ENTER(H1_EV_STRM_SHUT, h1c->conn, h1s);
+
+ if (se_fl_test(h1s->sd, SE_FL_KILL_CONN)) {
+ TRACE_STATE("stream wants to kill the connection", H1_EV_STRM_SHUT, h1c->conn, h1s);
+ ret = 1;
+ }
+ else if (h1c->state == H1_CS_CLOSING || (h1c->flags & (H1C_F_EOS|H1C_F_ERR_PENDING|H1C_F_ERROR))) {
+ TRACE_STATE("shutdown on connection (EOS || CLOSING || ERROR)", H1_EV_STRM_SHUT, h1c->conn, h1s);
+ ret = 1;
+ }
+ else if (h1c->state == H1_CS_UPGRADING) {
+ TRACE_STATE("keep connection alive (UPGRADING)", H1_EV_STRM_SHUT, h1c->conn, h1s);
+ ret = 0;
+ }
+ else if (!(h1c->flags & H1C_F_IS_BACK) && h1s->req.state != H1_MSG_DONE && h1s->res.state == H1_MSG_DONE) {
+ TRACE_STATE("defer shutdown to drain request first", H1_EV_STRM_SHUT, h1c->conn, h1s);
+ ret = 0;
+ }
+ else if (((h1s->flags & H1S_F_WANT_KAL) && h1s->req.state == H1_MSG_DONE && h1s->res.state == H1_MSG_DONE)) {
+ TRACE_STATE("keep connection alive (want_kal)", H1_EV_STRM_SHUT, h1c->conn, h1s);
+ ret = 0;
+ }
+ else {
+ /* The default case, do the shutdown */
+ ret = 1;
+ }
+
+ TRACE_LEAVE(H1_EV_STRM_SHUT, h1c->conn, h1s);
+ return ret;
+}
+
+/* Really detach the H1S. Most of time of it called from h1_detach() when the
+ * stream is detached from the connection. But if the request message must be
+ * drained first, the detach is deferred.
+ */
+static void h1s_finish_detach(struct h1s *h1s)
+{
+ struct h1c *h1c;
+ struct session *sess;
+ int is_not_first;
+
+ TRACE_ENTER(H1_EV_STRM_END, h1s ? h1s->h1c->conn : NULL, h1s);
+
+ sess = h1s->sess;
+ h1c = h1s->h1c;
+
+ sess->accept_date = date;
+ sess->accept_ts = now_ns;
+ sess->t_handshake = 0;
+ sess->t_idle = -1;
+
+ is_not_first = h1s->flags & H1S_F_NOT_FIRST;
+ h1s_destroy(h1s);
+
+ if (h1c->state == H1_CS_IDLE && (h1c->flags & H1C_F_IS_BACK)) {
+ /* this connection may be killed at any moment, we want it to
+ * die "cleanly" (i.e. only an RST).
+ */
+ h1c->flags |= H1C_F_SILENT_SHUT;
+
+ /* If there are any excess server data in the input buffer,
+ * release it and close the connection ASAP (some data may
+ * remain in the output buffer). This happens if a server sends
+ * invalid responses. So in such case, we don't want to reuse
+ * the connection
+ */
+ if (b_data(&h1c->ibuf)) {
+ h1_release_buf(h1c, &h1c->ibuf);
+ h1_close(h1c);
+ TRACE_DEVEL("remaining data on detach, kill connection", H1_EV_STRM_END|H1_EV_H1C_END);
+ goto release;
+ }
+
+ if (h1c->conn->flags & CO_FL_PRIVATE) {
+ /* Add the connection in the session server list, if not already done */
+ if (!session_add_conn(sess, h1c->conn, h1c->conn->target)) {
+ h1c->conn->owner = NULL;
+ h1c->conn->mux->destroy(h1c);
+ goto end;
+ }
+ /* Always idle at this step */
+
+ /* mark that the tasklet may lose its context to another thread and
+ * that the handler needs to check it under the idle conns lock.
+ */
+ HA_ATOMIC_OR(&h1c->wait_event.tasklet->state, TASK_F_USR1);
+ if (session_check_idle_conn(sess, h1c->conn)) {
+ /* The connection got destroyed, let's leave */
+ TRACE_DEVEL("outgoing connection killed", H1_EV_STRM_END|H1_EV_H1C_END);
+ goto end;
+ }
+ }
+ else {
+ if (h1c->conn->owner == sess)
+ h1c->conn->owner = NULL;
+
+ /* mark that the tasklet may lose its context to another thread and
+ * that the handler needs to check it under the idle conns lock.
+ */
+ HA_ATOMIC_OR(&h1c->wait_event.tasklet->state, TASK_F_USR1);
+ h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
+ xprt_set_idle(h1c->conn, h1c->conn->xprt, h1c->conn->xprt_ctx);
+
+ if (!srv_add_to_idle_list(objt_server(h1c->conn->target), h1c->conn, is_not_first)) {
+ /* The server doesn't want it, let's kill the connection right away */
+ h1c->conn->mux->destroy(h1c);
+ TRACE_DEVEL("outgoing connection killed", H1_EV_STRM_END|H1_EV_H1C_END);
+ goto end;
+ }
+ /* At this point, the connection has been added to the
+ * server idle list, so another thread may already have
+ * hijacked it, so we can't do anything with it.
+ */
+ return;
+ }
+ }
+
+ release:
+ /* We don't want to close right now unless the connection is in error or shut down for writes */
+ if ((h1c->flags & H1C_F_ERROR) ||
+ (h1c->state == H1_CS_CLOSED) ||
+ (h1c->state == H1_CS_CLOSING && !b_data(&h1c->obuf)) ||
+ !h1c->conn->owner) {
+ TRACE_DEVEL("killing dead connection", H1_EV_STRM_END, h1c->conn);
+ h1_release(h1c);
+ }
+ else {
+ if (h1c->state == H1_CS_IDLE) {
+ /* If we have a new request, process it immediately or
+ * subscribe for reads waiting for new data
+ */
+ if (unlikely(b_data(&h1c->ibuf))) {
+ if (h1_process(h1c) == -1)
+ goto end;
+ }
+ else
+ h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
+ }
+ h1_set_idle_expiration(h1c);
+ h1_refresh_timeout(h1c);
+ }
+ end:
+ TRACE_LEAVE(H1_EV_STRM_END);
+}
+
+
/*
* Initialize the mux once it's attached. It is expected that conn->ctx points
* to the existing stream connector (for outgoing connections or for incoming
@@ -1049,9 +1287,7 @@ static void h1_release(struct h1c *h1c)
}
- if (LIST_INLIST(&h1c->buf_wait.list))
- LIST_DEL_INIT(&h1c->buf_wait.list);
-
+ b_dequeue(&h1c->buf_wait);
h1_release_buf(h1c, &h1c->ibuf);
h1_release_buf(h1c, &h1c->obuf);
@@ -1416,21 +1652,33 @@ static void h1_capture_bad_message(struct h1c *h1c, struct h1s *h1s,
&ctx, h1_show_error_snapshot);
}
-/* Emit the chunksize followed by a CRLF in front of data of the buffer
+/* Emit the chunk size <chksz> followed by a CRLF in front of data of the buffer
* <buf>. It goes backwards and starts with the byte before the buffer's
* head. The caller is responsible for ensuring there is enough room left before
- * the buffer's head for the string.
+ * the buffer's head for the string. if <length> is greater than 0, it
+ * represents the expected total length of the chunk size, including the
+ * CRLF. So it will be padded with 0 to resepct this length. It is the caller
+ * responsibility to pass the right value. if <length> is set to 0 (or less that
+ * the smallest size to represent the chunk size), it is ignored.
*/
-static void h1_prepend_chunk_size(struct buffer *buf, size_t chksz)
+static void h1_prepend_chunk_size(struct buffer *buf, size_t chksz, size_t length)
{
char *beg, *end;
beg = end = b_head(buf);
*--beg = '\n';
*--beg = '\r';
+ if (length)
+ length -= 2;
do {
*--beg = hextab[chksz & 0xF];
+ if (length)
+ --length;
} while (chksz >>= 4);
+ while (length) {
+ *--beg = '0';
+ --length;
+ }
buf->head -= (end - beg);
b_add(buf, end - beg);
}
@@ -2328,15 +2576,47 @@ static size_t h1_make_eoh(struct h1s *h1s, struct h1m *h1m, struct htx *htx, siz
b_slow_realign(&h1c->obuf, trash.area, b_data(&h1c->obuf));
outbuf = b_make(b_tail(&h1c->obuf), b_contig_space(&h1c->obuf), 0, 0);
+ /* Deal with removed "Content-Length" or "Transfer-Encoding" headers during analysis */
+ if (((h1m->flags & H1_MF_CLEN) && !(h1s->flags & H1S_F_HAVE_CLEN))||
+ ((h1m->flags & H1_MF_CHNK) && !(h1s->flags & H1S_F_HAVE_CHNK))) {
+ TRACE_STATE("\"Content-Length\" or \"Transfer-Encoding\" header removed during analysis", H1_EV_TX_DATA|H1_EV_TX_HDRS, h1c->conn, h1s);
+
+ if (h1s->flags & (H1S_F_HAVE_CLEN|H1S_F_HAVE_CHNK)) {
+ /* At least on header is present, we can continue */
+ if (!(h1s->flags & H1S_F_HAVE_CLEN)) {
+ h1m->curr_len = h1m->body_len = 0;
+ h1m->flags &= ~H1_MF_CLEN;
+ }
+ else /* h1s->flags & H1S_F_HAVE_CHNK */
+ h1m->flags &= ~(H1_MF_XFER_ENC|H1_MF_CHNK);
+ }
+ else {
+ /* Both headers are missing */
+ if (h1m->flags & H1_MF_RESP) {
+ /* It is a esponse: Switch to unknown xfer length */
+ h1m->flags &= ~(H1_MF_XFER_LEN|H1_MF_XFER_ENC|H1_MF_CLEN|H1_MF_CHNK);
+ h1s->flags &= ~(H1S_F_HAVE_CLEN|H1S_F_HAVE_CHNK);
+ TRACE_STATE("Switch response to unknown XFER length", H1_EV_TX_DATA|H1_EV_TX_HDRS, h1c->conn, h1s);
+ }
+ else {
+ /* It is the request: Add "Content-Length: 0" header and skip payload */
+ struct ist n = ist("content-length");
+ if (h1c->px->options2 & (PR_O2_H1_ADJ_BUGCLI|PR_O2_H1_ADJ_BUGSRV))
+ h1_adjust_case_outgoing_hdr(h1s, h1m, &n);
+ if (!h1_format_htx_hdr(n, ist("0"), &outbuf))
+ goto full;
+
+ h1m->flags = (h1m->flags & ~(H1_MF_XFER_ENC|H1_MF_CHNK)) | H1_MF_CLEN;
+ h1s->flags = (h1s->flags & ~H1S_F_HAVE_CHNK) | (H1S_F_HAVE_CLEN|H1S_F_BODYLESS_REQ);
+ h1m->curr_len = h1m->body_len = 0;
+ TRACE_STATE("Set request content-length to 0 and skip payload", H1_EV_TX_DATA|H1_EV_TX_HDRS, h1c->conn, h1s);
+ }
+ }
+ }
+
/* Deal with "Connection" header */
if (!(h1s->flags & H1S_F_HAVE_O_CONN)) {
- if ((htx->flags & HTX_FL_PROXY_RESP) && h1s->req.state != H1_MSG_DONE) {
- /* If the reply comes from haproxy while the request is
- * not finished, we force the connection close. */
- h1s->flags = (h1s->flags & ~H1S_F_WANT_MSK) | H1S_F_WANT_CLO;
- TRACE_STATE("force close mode (resp)", H1_EV_TX_DATA|H1_EV_TX_HDRS, h1s->h1c->conn, h1s);
- }
- else if ((h1m->flags & (H1_MF_XFER_ENC|H1_MF_CLEN)) == (H1_MF_XFER_ENC|H1_MF_CLEN)) {
+ if ((h1m->flags & (H1_MF_XFER_ENC|H1_MF_CLEN)) == (H1_MF_XFER_ENC|H1_MF_CLEN)) {
/* T-E + C-L: force close */
h1s->flags = (h1s->flags & ~H1S_F_WANT_MSK) | H1S_F_WANT_CLO;
h1m->flags &= ~H1_MF_CLEN;
@@ -2384,23 +2664,6 @@ static size_t h1_make_eoh(struct h1s *h1s, struct h1m *h1m, struct htx *htx, siz
h1s->flags |= H1S_F_HAVE_CHNK;
}
- /* Deal with "Content-Length header */
- if ((h1m->flags & H1_MF_CLEN) && !(h1s->flags & H1S_F_HAVE_CLEN)) {
- char *end;
-
- h1m->curr_len = h1m->body_len = htx->data + htx->extra - sz;
- end = DISGUISE(ulltoa(h1m->body_len, trash.area, b_size(&trash)));
-
- n = ist("content-length");
- v = ist2(trash.area, end-trash.area);
- if (h1c->px->options2 & (PR_O2_H1_ADJ_BUGCLI|PR_O2_H1_ADJ_BUGSRV))
- h1_adjust_case_outgoing_hdr(h1s, h1m, &n);
- if (!h1_format_htx_hdr(n, v, &outbuf))
- goto full;
- TRACE_STATE("add \"Content-Length: <LEN>\"", H1_EV_TX_DATA|H1_EV_TX_HDRS, h1c->conn, h1s);
- h1s->flags |= H1S_F_HAVE_CLEN;
- }
-
/* Add the server name to a header (if requested) */
if (!(h1s->flags & H1S_F_HAVE_SRV_NAME) &&
!(h1m->flags & H1_MF_RESP) && isttest(h1c->px->server_id_hdr_name)) {
@@ -2555,7 +2818,8 @@ static size_t h1_make_data(struct h1s *h1s, struct h1m *h1m, struct buffer *buf,
* end-to-end. This is the situation that happens all the time with
* large files.
*/
- if ((!(h1m->flags & H1_MF_RESP) || !(h1s->flags & H1S_F_BODYLESS_RESP)) &&
+ if (((!(h1m->flags & H1_MF_RESP) && !(h1s->flags & H1S_F_BODYLESS_REQ)) ||
+ ((h1m->flags & H1_MF_RESP) && !(h1s->flags & H1S_F_BODYLESS_RESP))) &&
!b_data(&h1c->obuf) &&
(!(h1m->flags & H1_MF_CHNK) || ((h1m->flags & H1_MF_CHNK) && (!h1m->curr_len || count == h1m->curr_len))) &&
htx_nbblks(htx) == 1 &&
@@ -2612,7 +2876,7 @@ static size_t h1_make_data(struct h1s *h1s, struct h1m *h1m, struct buffer *buf,
/* Because chunk meta-data are prepended, the chunk size of the current chunk
* must be handled before the end of the previous chunk.
*/
- h1_prepend_chunk_size(&h1c->obuf, h1m->curr_len);
+ h1_prepend_chunk_size(&h1c->obuf, h1m->curr_len, 0);
if (h1m->state == H1_MSG_CHUNK_CRLF)
h1_prepend_chunk_crlf(&h1c->obuf);
@@ -2682,8 +2946,9 @@ static size_t h1_make_data(struct h1s *h1s, struct h1m *h1m, struct buffer *buf,
last_data = 1;
}
- if ((h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_RESP)) {
- TRACE_PROTO("Skip data for bodyless response", H1_EV_TX_DATA|H1_EV_TX_BODY, h1c->conn, h1s, htx);
+ if ((!(h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_REQ)) ||
+ ((h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_RESP))) {
+ TRACE_PROTO("Skip data for bodyless message", H1_EV_TX_DATA|H1_EV_TX_BODY, h1c->conn, h1s, htx);
goto nextblk;
}
@@ -2754,7 +3019,8 @@ static size_t h1_make_data(struct h1s *h1s, struct h1m *h1m, struct buffer *buf,
}
else if (type == HTX_BLK_EOT || type == HTX_BLK_TLR) {
- if ((h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_RESP)) {
+ if ((!(h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_REQ)) ||
+ ((h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_RESP))) {
/* Do nothing the payload must be skipped
* because it is a bodyless response
*/
@@ -2954,7 +3220,9 @@ static size_t h1_make_trailers(struct h1s *h1s, struct h1m *h1m, struct htx *htx
if (sz > count)
goto error;
- if (!(h1m->flags & H1_MF_CHNK) || ((h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_RESP)))
+ if (!(h1m->flags & H1_MF_CHNK) ||
+ (!(h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_REQ)) ||
+ ((h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_RESP)))
goto nextblk;
n = htx_get_blk_name(htx, blk);
@@ -2967,7 +3235,9 @@ static size_t h1_make_trailers(struct h1s *h1s, struct h1m *h1m, struct htx *htx
goto full;
}
else if (type == HTX_BLK_EOT) {
- if (!(h1m->flags & H1_MF_CHNK) || ((h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_RESP))) {
+ if (!(h1m->flags & H1_MF_CHNK) ||
+ (!(h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_REQ)) ||
+ ((h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_RESP))) {
TRACE_PROTO((!(h1m->flags & H1_MF_RESP) ? "H1 request trailers skipped" : "H1 response trailers skipped"),
H1_EV_TX_DATA|H1_EV_TX_TLRS, h1c->conn, h1s);
}
@@ -3023,8 +3293,7 @@ static size_t h1_make_chunk(struct h1s *h1s, struct h1m * h1m, size_t len)
TRACE_ENTER(H1_EV_TX_DATA|H1_EV_TX_BODY, h1c->conn, h1s);
- if (!h1_get_buf(h1c, &h1c->obuf)) {
- h1c->flags |= H1C_F_OUT_ALLOC;
+ if (!h1_get_obuf(h1c)) {
TRACE_STATE("waiting for h1c obuf allocation", H1_EV_TX_DATA|H1_EV_H1S_BLK, h1c->conn, h1s);
goto end;
}
@@ -3077,8 +3346,7 @@ static size_t h1_process_mux(struct h1c *h1c, struct buffer *buf, size_t count)
if (h1s->flags & (H1S_F_INTERNAL_ERROR|H1S_F_PROCESSING_ERROR|H1S_F_TX_BLK))
goto end;
- if (!h1_get_buf(h1c, &h1c->obuf)) {
- h1c->flags |= H1C_F_OUT_ALLOC;
+ if (!h1_get_obuf(h1c)) {
TRACE_STATE("waiting for h1c obuf allocation", H1_EV_TX_DATA|H1_EV_H1S_BLK, h1c->conn, h1s);
goto end;
}
@@ -3252,8 +3520,8 @@ static int h1_send_error(struct h1c *h1c)
goto out;
}
- if (!h1_get_buf(h1c, &h1c->obuf)) {
- h1c->flags |= (H1C_F_OUT_ALLOC|H1C_F_ABRT_PENDING);
+ if (!h1_get_obuf(h1c)) {
+ h1c->flags |= H1C_F_ABRT_PENDING;
TRACE_STATE("waiting for h1c obuf allocation", H1_EV_H1C_ERR|H1_EV_H1C_BLK, h1c->conn);
goto out;
}
@@ -3291,6 +3559,11 @@ static int h1_handle_internal_err(struct h1c *h1c)
struct session *sess = h1c->conn->owner;
int ret = 0;
+ if (h1c->state == H1_CS_DRAINING) {
+ h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
+ h1s_destroy(h1c->h1s);
+ goto end;
+ }
session_inc_http_req_ctr(sess);
proxy_inc_fe_req_ctr(sess->listener, sess->fe, 1);
_HA_ATOMIC_INC(&sess->fe->fe_counters.p.http.rsp[5]);
@@ -3301,6 +3574,7 @@ static int h1_handle_internal_err(struct h1c *h1c)
h1c->errcode = 500;
ret = h1_send_error(h1c);
sess_log(sess);
+ end:
return ret;
}
@@ -3314,6 +3588,11 @@ static int h1_handle_parsing_error(struct h1c *h1c)
struct session *sess = h1c->conn->owner;
int ret = 0;
+ if (h1c->state == H1_CS_DRAINING) {
+ h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
+ h1s_destroy(h1c->h1s);
+ goto end;
+ }
if (!b_data(&h1c->ibuf) && ((h1c->flags & H1C_F_WAIT_NEXT_REQ) || (sess->fe->options & PR_O_IGNORE_PRB))) {
h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
h1_close(h1c);
@@ -3347,6 +3626,11 @@ static int h1_handle_not_impl_err(struct h1c *h1c)
struct session *sess = h1c->conn->owner;
int ret = 0;
+ if (h1c->state == H1_CS_DRAINING) {
+ h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
+ h1s_destroy(h1c->h1s);
+ goto end;
+ }
if (!b_data(&h1c->ibuf) && ((h1c->flags & H1C_F_WAIT_NEXT_REQ) || (sess->fe->options & PR_O_IGNORE_PRB))) {
h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
h1_close(h1c);
@@ -3377,6 +3661,11 @@ static int h1_handle_req_tout(struct h1c *h1c)
struct session *sess = h1c->conn->owner;
int ret = 0;
+ if (h1c->state == H1_CS_DRAINING) {
+ h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
+ h1s_destroy(h1c->h1s);
+ goto end;
+ }
if (!b_data(&h1c->ibuf) && ((h1c->flags & H1C_F_WAIT_NEXT_REQ) || (sess->fe->options & PR_O_IGNORE_PRB))) {
h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
h1_close(h1c);
@@ -3421,8 +3710,7 @@ static int h1_recv(struct h1c *h1c)
return 1;
}
- if (!h1_get_buf(h1c, &h1c->ibuf)) {
- h1c->flags |= H1C_F_IN_ALLOC;
+ if (!h1_get_ibuf(h1c)) {
TRACE_STATE("waiting for h1c ibuf allocation", H1_EV_H1C_RECV|H1_EV_H1C_BLK, h1c->conn);
return 0;
}
@@ -3594,7 +3882,7 @@ static int h1_process(struct h1c * h1c)
/* Try to parse now the first block of a request, creating the H1 stream if necessary */
if (b_data(&h1c->ibuf) && /* Input data to be processed */
- (h1c->state < H1_CS_RUNNING) && /* IDLE, EMBRYONIC or UPGRADING */
+ ((h1c->state < H1_CS_RUNNING) || (h1c->state == H1_CS_DRAINING)) && /* IDLE, EMBRYONIC, UPGRADING or DRAINING */
!(h1c->flags & (H1C_F_IN_SALLOC|H1C_F_ABRT_PENDING))) { /* No allocation failure on the stream rxbuf and no ERROR on the H1C */
struct h1s *h1s = h1c->h1s;
struct buffer *buf;
@@ -3605,7 +3893,8 @@ static int h1_process(struct h1c * h1c)
goto release;
/* First of all handle H1 to H2 upgrade (no need to create the H1 stream) */
- if (!(h1c->flags & H1C_F_WAIT_NEXT_REQ) && /* First request */
+ if (h1c->state != H1_CS_DRAINING && /* Not draining message */
+ !(h1c->flags & H1C_F_WAIT_NEXT_REQ) && /* First request */
!(h1c->px->options2 & PR_O2_NO_H2_UPGRADE) && /* H2 upgrade supported by the proxy */
!(conn->mux->flags & MX_FL_NO_UPG)) { /* the current mux supports upgrades */
/* Try to match H2 preface before parsing the request headers. */
@@ -3635,9 +3924,8 @@ static int h1_process(struct h1c * h1c)
h1s->sess->t_idle = ns_to_ms(now_ns - h1s->sess->accept_ts) - h1s->sess->t_handshake;
/* Get the stream rxbuf */
- buf = h1_get_buf(h1c, &h1s->rxbuf);
+ buf = h1_get_rxbuf(h1s);
if (!buf) {
- h1c->flags |= H1C_F_IN_SALLOC;
TRACE_STATE("waiting for stream rxbuf allocation", H1_EV_H1C_WAKE|H1_EV_H1C_BLK, h1c->conn);
return 0;
}
@@ -3646,7 +3934,7 @@ static int h1_process(struct h1c * h1c)
h1_process_demux(h1c, buf, count);
h1_release_buf(h1c, &h1s->rxbuf);
h1_set_idle_expiration(h1c);
- if (h1c->state < H1_CS_RUNNING) {
+ if (h1c->state != H1_CS_RUNNING) { // TODO: be sure state cannot change in h1_process_demux
if (h1s->flags & H1S_F_INTERNAL_ERROR) {
h1_handle_internal_err(h1c);
TRACE_ERROR("internal error detected", H1_EV_H1C_WAKE|H1_EV_H1C_ERR);
@@ -3689,6 +3977,11 @@ static int h1_process(struct h1c * h1c)
if (h1_send_error(h1c))
h1_send(h1c);
}
+ else if (h1c->state == H1_CS_DRAINING) {
+ BUG_ON(h1c->h1s->sd && !se_fl_test(h1c->h1s->sd, SE_FL_ORPHAN));
+ h1s_destroy(h1c->h1s);
+ TRACE_STATE("abort/error when draining message. destroy h1s and close h1c", H1_EV_H1S_END, h1c->conn);
+ }
else {
h1_close(h1c);
TRACE_STATE("close h1c", H1_EV_H1S_END, h1c->conn);
@@ -3717,6 +4010,17 @@ static int h1_process(struct h1c * h1c)
h1_alert(h1s);
}
}
+ else if (h1c->state == H1_CS_DRAINING) {
+ BUG_ON(!h1c->h1s);
+ if (se_fl_test(h1c->h1s->sd, SE_FL_EOI)) {
+ if (h1s_must_shut_conn(h1c->h1s)) {
+ h1_shutw_conn(conn);
+ goto release;
+ }
+ h1s_finish_detach(h1c->h1s);
+ goto end;
+ }
+ }
if (!b_data(&h1c->ibuf))
h1_release_buf(h1c, &h1c->ibuf);
@@ -4025,8 +4329,6 @@ static void h1_detach(struct sedesc *sd)
{
struct h1s *h1s = sd->se;
struct h1c *h1c;
- struct session *sess;
- int is_not_first;
TRACE_ENTER(H1_EV_STRM_END, h1s ? h1s->h1c->conn : NULL, h1s);
@@ -4034,149 +4336,47 @@ static void h1_detach(struct sedesc *sd)
TRACE_LEAVE(H1_EV_STRM_END);
return;
}
-
- sess = h1s->sess;
h1c = h1s->h1c;
- sess->accept_date = date;
- sess->accept_ts = now_ns;
- sess->t_handshake = 0;
- sess->t_idle = -1;
-
- is_not_first = h1s->flags & H1S_F_NOT_FIRST;
- h1s_destroy(h1s);
-
- if (h1c->state == H1_CS_IDLE && (h1c->flags & H1C_F_IS_BACK)) {
- /* this connection may be killed at any moment, we want it to
- * die "cleanly" (i.e. only an RST).
+ if (h1c->state == H1_CS_RUNNING && !(h1c->flags & H1C_F_IS_BACK) && h1s->req.state != H1_MSG_DONE) {
+ h1c->state = H1_CS_DRAINING;
+ TRACE_DEVEL("Deferring H1S destroy to drain message", H1_EV_STRM_END, h1s->h1c->conn, h1s);
+ /* If we have a pending data, process it immediately or
+ * subscribe for reads waiting for new data
*/
- h1c->flags |= H1C_F_SILENT_SHUT;
-
- /* If there are any excess server data in the input buffer,
- * release it and close the connection ASAP (some data may
- * remain in the output buffer). This happens if a server sends
- * invalid responses. So in such case, we don't want to reuse
- * the connection
- */
- if (b_data(&h1c->ibuf)) {
- h1_release_buf(h1c, &h1c->ibuf);
- h1_close(h1c);
- TRACE_DEVEL("remaining data on detach, kill connection", H1_EV_STRM_END|H1_EV_H1C_END);
- goto release;
- }
-
- if (h1c->conn->flags & CO_FL_PRIVATE) {
- /* Add the connection in the session server list, if not already done */
- if (!session_add_conn(sess, h1c->conn, h1c->conn->target)) {
- h1c->conn->owner = NULL;
- h1c->conn->mux->destroy(h1c);
+ if (unlikely(b_data(&h1c->ibuf))) {
+ if (h1_process(h1c) == -1)
goto end;
- }
- /* Always idle at this step */
- if (session_check_idle_conn(sess, h1c->conn)) {
- /* The connection got destroyed, let's leave */
- TRACE_DEVEL("outgoing connection killed", H1_EV_STRM_END|H1_EV_H1C_END);
- goto end;
- }
}
- else {
- if (h1c->conn->owner == sess)
- h1c->conn->owner = NULL;
-
- /* mark that the tasklet may lose its context to another thread and
- * that the handler needs to check it under the idle conns lock.
- */
- HA_ATOMIC_OR(&h1c->wait_event.tasklet->state, TASK_F_USR1);
+ else
h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
- xprt_set_idle(h1c->conn, h1c->conn->xprt, h1c->conn->xprt_ctx);
-
- if (!srv_add_to_idle_list(objt_server(h1c->conn->target), h1c->conn, is_not_first)) {
- /* The server doesn't want it, let's kill the connection right away */
- h1c->conn->mux->destroy(h1c);
- TRACE_DEVEL("outgoing connection killed", H1_EV_STRM_END|H1_EV_H1C_END);
- goto end;
- }
- /* At this point, the connection has been added to the
- * server idle list, so another thread may already have
- * hijacked it, so we can't do anything with it.
- */
- return;
- }
- }
-
- release:
- /* We don't want to close right now unless the connection is in error or shut down for writes */
- if ((h1c->flags & H1C_F_ERROR) ||
- (h1c->state == H1_CS_CLOSED) ||
- (h1c->state == H1_CS_CLOSING && !b_data(&h1c->obuf)) ||
- !h1c->conn->owner) {
- TRACE_DEVEL("killing dead connection", H1_EV_STRM_END, h1c->conn);
- h1_release(h1c);
- }
- else {
- if (h1c->state == H1_CS_IDLE) {
- /* If we have a new request, process it immediately or
- * subscribe for reads waiting for new data
- */
- if (unlikely(b_data(&h1c->ibuf))) {
- if (h1_process(h1c) == -1)
- goto end;
- }
- else
- h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
- }
h1_set_idle_expiration(h1c);
h1_refresh_timeout(h1c);
}
+ else
+ h1s_finish_detach(h1s);
+
end:
TRACE_LEAVE(H1_EV_STRM_END);
}
-
-static void h1_shutr(struct stconn *sc, enum co_shr_mode mode)
+static void h1_shut(struct stconn *sc, enum se_shut_mode mode, struct se_abort_info *reason)
{
struct h1s *h1s = __sc_mux_strm(sc);
struct h1c *h1c;
- if (!h1s)
- return;
- h1c = h1s->h1c;
-
- TRACE_POINT(H1_EV_STRM_SHUT, h1c->conn, h1s, 0, (size_t[]){mode});
-}
-
-static void h1_shutw(struct stconn *sc, enum co_shw_mode mode)
-{
- struct h1s *h1s = __sc_mux_strm(sc);
- struct h1c *h1c;
-
- if (!h1s)
+ if (!h1s || !(mode & (SE_SHW_SILENT|SE_SHW_NORMAL)))
return;
h1c = h1s->h1c;
TRACE_ENTER(H1_EV_STRM_SHUT, h1c->conn, h1s, 0, (size_t[]){mode});
- if (se_fl_test(h1s->sd, SE_FL_KILL_CONN)) {
- TRACE_STATE("stream wants to kill the connection", H1_EV_STRM_SHUT, h1c->conn, h1s);
- goto do_shutw;
- }
- if (h1c->state == H1_CS_CLOSING || (h1c->flags & (H1C_F_EOS|H1C_F_ERR_PENDING|H1C_F_ERROR))) {
- TRACE_STATE("shutdown on connection (EOS || CLOSING || ERROR)", H1_EV_STRM_SHUT, h1c->conn, h1s);
- goto do_shutw;
- }
-
- if (h1c->state == H1_CS_UPGRADING) {
- TRACE_STATE("keep connection alive (UPGRADING)", H1_EV_STRM_SHUT, h1c->conn, h1s);
+ if (!h1s_must_shut_conn(h1s))
goto end;
- }
- if (((h1s->flags & H1S_F_WANT_KAL) && h1s->req.state == H1_MSG_DONE && h1s->res.state == H1_MSG_DONE)) {
- TRACE_STATE("keep connection alive (want_kal)", H1_EV_STRM_SHUT, h1c->conn, h1s);
- goto end;
- }
do_shutw:
h1_close(h1c);
- if (mode != CO_SHW_NORMAL)
+ if (mode & SE_SHW_NORMAL)
h1c->flags |= H1C_F_SILENT_SHUT;
if (!b_data(&h1c->obuf))
@@ -4405,12 +4605,12 @@ static inline struct sedesc *h1s_opposite_sd(struct h1s *h1s)
return sdo;
}
-static size_t h1_nego_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice)
+static size_t h1_nego_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int flags)
{
struct h1s *h1s = __sc_mux_strm(sc);
struct h1c *h1c = h1s->h1c;
struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req);
- size_t ret = 0;
+ size_t sz, offset = 0, ret = 0;
TRACE_ENTER(H1_EV_STRM_SEND, h1c->conn, h1s, 0, (size_t[]){count});
@@ -4420,21 +4620,55 @@ static size_t h1_nego_ff(struct stconn *sc, struct buffer *input, size_t count,
goto out;
}
- /* TODO: add check on curr_len if CLEN */
+ if ((!(h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_REQ)) ||
+ ((h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_RESP))) {
+ TRACE_STATE("Bodyless message, disable fastfwd", H1_EV_STRM_SEND|H1_EV_STRM_ERR, h1c->conn, h1s);
+ h1s->sd->iobuf.flags |= IOBUF_FL_NO_FF;
+ goto out;
+ }
- if (h1m->flags & H1_MF_CHNK) {
+ if (h1m->flags & H1_MF_CLEN) {
+ if ((flags & NEGO_FF_FL_EXACT_SIZE) && count > h1m->curr_len) {
+ TRACE_ERROR("more payload than announced", H1_EV_STRM_SEND|H1_EV_STRM_ERR, h1c->conn, h1s);
+ h1s->sd->iobuf.flags |= IOBUF_FL_NO_FF;
+ goto out;
+ }
+ }
+ else if (h1m->flags & H1_MF_CHNK) {
if (h1m->curr_len) {
BUG_ON(h1m->state != H1_MSG_DATA);
- if (count > h1m->curr_len)
+ if (count > h1m->curr_len) {
+ if ((flags & NEGO_FF_FL_EXACT_SIZE) && count > h1m->curr_len) {
+ TRACE_ERROR("chunk bigger than announced", H1_EV_STRM_SEND|H1_EV_STRM_ERR, h1c->conn, h1s);
+ h1s->sd->iobuf.flags |= IOBUF_FL_NO_FF;
+ goto out;
+ }
count = h1m->curr_len;
+ }
}
else {
BUG_ON(h1m->state != H1_MSG_CHUNK_CRLF && h1m->state != H1_MSG_CHUNK_SIZE);
- if (!h1_make_chunk(h1s, h1m, count)) {
+ if (flags & NEGO_FF_FL_EXACT_SIZE) {
+ if (!h1_make_chunk(h1s, h1m, count)) {
h1s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
- goto out;
+ goto out;
+ }
+ h1m->curr_len = count;
+ }
+ else {
+ /* The producer does not know the chunk size, thus this will be emitted at the
+ * end, in done_ff(). So splicing cannot be used (see TODO below).
+ * We will reserve 10 bytes to handle at most 4Go chunk !
+ * (<8-bytes SIZE><CRLF><CHUNK-DATA>)
+ */
+ if (count > MAX_RANGE(unsigned int))
+ count = MAX_RANGE(unsigned int);
+ offset = 10;
+ /* Add 2 more bytes to finish the previous chunk */
+ if (h1m->state == H1_MSG_CHUNK_CRLF)
+ offset += 2;
+ goto no_splicing;
}
- h1m->curr_len = count;
}
}
@@ -4445,7 +4679,7 @@ static size_t h1_nego_ff(struct stconn *sc, struct buffer *input, size_t count,
* and then data in pipe, or the opposite. For now, it is not
* supported to mix data.
*/
- if (!b_data(input) && !b_data(&h1c->obuf) && may_splice) {
+ if (!b_data(input) && !b_data(&h1c->obuf) && (flags & NEGO_FF_FL_MAY_SPLICE)) {
#if defined(USE_LINUX_SPLICE)
if (h1c->conn->xprt->snd_pipe && (h1s->sd->iobuf.pipe || (pipes_used < global.maxpipes && (h1s->sd->iobuf.pipe = get_pipe())))) {
h1s->sd->iobuf.offset = 0;
@@ -4458,8 +4692,8 @@ static size_t h1_nego_ff(struct stconn *sc, struct buffer *input, size_t count,
TRACE_DEVEL("Unable to allocate pipe for splicing, fallback to buffer", H1_EV_STRM_SEND, h1c->conn, h1s);
}
- if (!h1_get_buf(h1c, &h1c->obuf)) {
- h1c->flags |= H1C_F_OUT_ALLOC;
+ no_splicing:
+ if (!h1_get_obuf(h1c)) {
h1s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
TRACE_STATE("waiting for opposite h1c obuf allocation", H1_EV_STRM_SEND|H1_EV_H1S_BLK, h1c->conn, h1s);
goto out;
@@ -4468,21 +4702,22 @@ static size_t h1_nego_ff(struct stconn *sc, struct buffer *input, size_t count,
if (b_space_wraps(&h1c->obuf))
b_slow_realign(&h1c->obuf, trash.area, b_data(&h1c->obuf));
- h1s->sd->iobuf.buf = &h1c->obuf;
- h1s->sd->iobuf.offset = 0;
- h1s->sd->iobuf.data = 0;
-
- /* Cannot forward more than available room in output buffer */
- if (count > b_room(&h1c->obuf))
- count = b_room(&h1c->obuf);
-
- if (!count) {
+ if (b_contig_space(&h1c->obuf) <= offset) {
h1c->flags |= H1C_F_OUT_FULL;
h1s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
TRACE_STATE("output buffer full", H1_EV_STRM_SEND|H1_EV_H1S_BLK, h1c->conn, h1s);
goto out;
}
+ /* Cannot forward more than available room in output buffer */
+ sz = b_contig_space(&h1c->obuf) - offset;
+ if (count > sz)
+ count = sz;
+
+ h1s->sd->iobuf.buf = &h1c->obuf;
+ h1s->sd->iobuf.offset = offset;
+ h1s->sd->iobuf.data = 0;
+
/* forward remaining input data */
if (b_data(input)) {
size_t xfer = count;
@@ -4529,6 +4764,17 @@ static size_t h1_done_ff(struct stconn *sc)
if (b_room(&h1c->obuf) == sd->iobuf.offset)
h1c->flags |= H1C_F_OUT_FULL;
+ if (sd->iobuf.data && sd->iobuf.offset) {
+ struct buffer buf = b_make(b_orig(&h1c->obuf), b_size(&h1c->obuf),
+ b_peek_ofs(&h1c->obuf, b_data(&h1c->obuf) - sd->iobuf.data + sd->iobuf.offset),
+ sd->iobuf.data);
+ h1_prepend_chunk_size(&buf, sd->iobuf.data, sd->iobuf.offset - ((h1m->state == H1_MSG_CHUNK_CRLF) ? 2 : 0));
+ if (h1m->state == H1_MSG_CHUNK_CRLF)
+ h1_prepend_chunk_crlf(&buf);
+ b_add(&h1c->obuf, sd->iobuf.offset);
+ h1m->state = H1_MSG_CHUNK_CRLF;
+ }
+
total = sd->iobuf.data;
sd->iobuf.buf = NULL;
sd->iobuf.offset = 0;
@@ -4583,6 +4829,7 @@ static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)
struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->req : &h1s->res);
struct sedesc *sdo = NULL;
size_t total = 0, try = 0;
+ unsigned int nego_flags = NEGO_FF_FL_NONE;
int ret = 0;
TRACE_ENTER(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){count});
@@ -4612,10 +4859,15 @@ static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)
retry:
ret = 0;
- if (h1m->state == H1_MSG_DATA && (h1m->flags & (H1_MF_CHNK|H1_MF_CLEN)) && count > h1m->curr_len)
+ if (h1m->state == H1_MSG_DATA && (h1m->flags & (H1_MF_CHNK|H1_MF_CLEN)) && count > h1m->curr_len) {
+ flags |= NEGO_FF_FL_EXACT_SIZE;
count = h1m->curr_len;
+ }
+
+ if (h1c->conn->xprt->rcv_pipe && !!(flags & CO_RFL_MAY_SPLICE) && !(sdo->iobuf.flags & IOBUF_FL_NO_SPLICING))
+ nego_flags |= NEGO_FF_FL_MAY_SPLICE;
- try = se_nego_ff(sdo, &h1c->ibuf, count, h1c->conn->xprt->rcv_pipe && !!(flags & CO_RFL_MAY_SPLICE) && !(sdo->iobuf.flags & IOBUF_FL_NO_SPLICING));
+ try = se_nego_ff(sdo, &h1c->ibuf, count, nego_flags);
if (b_room(&h1c->ibuf) && (h1c->flags & H1C_F_IN_FULL)) {
h1c->flags &= ~H1C_F_IN_FULL;
TRACE_STATE("h1c ibuf not full anymore", H1_EV_STRM_RECV|H1_EV_H1C_BLK);
@@ -4848,6 +5100,10 @@ static int h1_ctl(struct connection *conn, enum mux_ctl_type mux_ctl, void *outp
if (!(h1c->wait_event.events & SUB_RETRY_RECV))
h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
return 0;
+ case MUX_CTL_GET_NBSTRM:
+ return h1_used_streams(conn);
+ case MUX_CTL_GET_MAXSTRM:
+ return 1;
default:
return -1;
}
@@ -5032,25 +5288,35 @@ static int add_hdr_case_adjust(const char *from, const char *to, char **err)
* Return 0 if successful, non-zero otherwise.
* Expected to be called with the old thread lock held.
*/
-static int h1_takeover(struct connection *conn, int orig_tid)
+static int h1_takeover(struct connection *conn, int orig_tid, int release)
{
struct h1c *h1c = conn->ctx;
struct task *task;
- struct task *new_task;
- struct tasklet *new_tasklet;
+ struct task *new_task = NULL;
+ struct tasklet *new_tasklet = NULL;
/* Pre-allocate tasks so that we don't have to roll back after the xprt
* has been migrated.
*/
- new_task = task_new_here();
- new_tasklet = tasklet_new();
- if (!new_task || !new_tasklet)
- goto fail;
+ if (!release) {
+ /* If the connection is attached to a buffer_wait (extremely
+ * rare), it will be woken up at any instant by its own thread
+ * and we can't undo it anyway, so let's give up on this one.
+ * It's not interesting anyway since it's not usable right now.
+ */
+ if (LIST_INLIST(&h1c->buf_wait.list))
+ goto fail;
+
+ new_task = task_new_here();
+ new_tasklet = tasklet_new();
+ if (!new_task || !new_tasklet)
+ goto fail;
+ }
if (fd_takeover(conn->handle.fd, conn) != 0)
goto fail;
- if (conn->xprt->takeover && conn->xprt->takeover(conn, conn->xprt_ctx, orig_tid) != 0) {
+ if (conn->xprt->takeover && conn->xprt->takeover(conn, conn->xprt_ctx, orig_tid, release) != 0) {
/* We failed to takeover the xprt, even if the connection may
* still be valid, flag it as error'd, as we have already
* taken over the fd, and wake the tasklet, so that it will
@@ -5077,8 +5343,10 @@ static int h1_takeover(struct connection *conn, int orig_tid)
h1c->task = new_task;
new_task = NULL;
- h1c->task->process = h1_timeout_task;
- h1c->task->context = h1c;
+ if (!release) {
+ h1c->task->process = h1_timeout_task;
+ h1c->task->context = h1c;
+ }
}
/* To let the tasklet know it should free itself, and do nothing else,
@@ -5088,10 +5356,26 @@ static int h1_takeover(struct connection *conn, int orig_tid)
tasklet_wakeup_on(h1c->wait_event.tasklet, orig_tid);
h1c->wait_event.tasklet = new_tasklet;
- h1c->wait_event.tasklet->process = h1_io_cb;
- h1c->wait_event.tasklet->context = h1c;
- h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx,
- SUB_RETRY_RECV, &h1c->wait_event);
+ if (!release) {
+ h1c->wait_event.tasklet->process = h1_io_cb;
+ h1c->wait_event.tasklet->context = h1c;
+ h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx,
+ SUB_RETRY_RECV, &h1c->wait_event);
+ }
+
+ if (release) {
+ /* we're being called for a server deletion and are running
+ * under thread isolation. That's the only way we can
+ * unregister a possible subscription of the original
+ * connection from its owner thread's queue, as this involves
+ * manipulating thread-unsafe areas. Note that it is not
+ * possible to just call b_dequeue() here as it would update
+ * the current thread's bufq_map and not the original one.
+ */
+ BUG_ON(!thread_isolated());
+ if (LIST_INLIST(&h1c->buf_wait.list))
+ _b_dequeue(&h1c->buf_wait, orig_tid);
+ }
if (new_task)
__task_free(new_task);
@@ -5321,8 +5605,7 @@ static const struct mux_ops mux_http_ops = {
.resume_fastfwd = h1_resume_fastfwd,
.subscribe = h1_subscribe,
.unsubscribe = h1_unsubscribe,
- .shutr = h1_shutr,
- .shutw = h1_shutw,
+ .shut = h1_shut,
.show_fd = h1_show_fd,
.show_sd = h1_show_sd,
.ctl = h1_ctl,
@@ -5349,8 +5632,7 @@ static const struct mux_ops mux_h1_ops = {
.resume_fastfwd = h1_resume_fastfwd,
.subscribe = h1_subscribe,
.unsubscribe = h1_unsubscribe,
- .shutr = h1_shutr,
- .shutw = h1_shutw,
+ .shut = h1_shut,
.show_fd = h1_show_fd,
.show_sd = h1_show_sd,
.ctl = h1_ctl,