diff options
Diffstat (limited to 'src/mux_h1.c')
-rw-r--r-- | src/mux_h1.c | 770 |
1 files changed, 526 insertions, 244 deletions
diff --git a/src/mux_h1.c b/src/mux_h1.c index 6593661..6bdaf71 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -227,7 +227,7 @@ enum { }; -static struct name_desc h1_stats[] = { +static struct stat_col h1_stats[] = { [H1_ST_OPEN_CONN] = { .name = "h1_open_connections", .desc = "Count of currently open connections" }, [H1_ST_OPEN_STREAM] = { .name = "h1_open_streams", @@ -264,21 +264,54 @@ static struct h1_counters { #endif } h1_counters; -static void h1_fill_stats(void *data, struct field *stats) +static int h1_fill_stats(void *data, struct field *stats, unsigned int *selected_field) { struct h1_counters *counters = data; + unsigned int current_field = (selected_field != NULL ? *selected_field : 0); - stats[H1_ST_OPEN_CONN] = mkf_u64(FN_GAUGE, counters->open_conns); - stats[H1_ST_OPEN_STREAM] = mkf_u64(FN_GAUGE, counters->open_streams); - stats[H1_ST_TOTAL_CONN] = mkf_u64(FN_COUNTER, counters->total_conns); - stats[H1_ST_TOTAL_STREAM] = mkf_u64(FN_COUNTER, counters->total_streams); + for (; current_field < H1_STATS_COUNT; current_field++) { + struct field metric = { 0 }; - stats[H1_ST_BYTES_IN] = mkf_u64(FN_COUNTER, counters->bytes_in); - stats[H1_ST_BYTES_OUT] = mkf_u64(FN_COUNTER, counters->bytes_out); + switch (current_field) { + case H1_ST_OPEN_CONN: + metric = mkf_u64(FN_GAUGE, counters->open_conns); + break; + case H1_ST_OPEN_STREAM: + metric = mkf_u64(FN_GAUGE, counters->open_streams); + break; + case H1_ST_TOTAL_CONN: + metric = mkf_u64(FN_COUNTER, counters->total_conns); + break; + case H1_ST_TOTAL_STREAM: + metric = mkf_u64(FN_COUNTER, counters->total_streams); + break; + case H1_ST_BYTES_IN: + metric = mkf_u64(FN_COUNTER, counters->bytes_in); + break; + case H1_ST_BYTES_OUT: + metric = mkf_u64(FN_COUNTER, counters->bytes_out); + break; #if defined(USE_LINUX_SPLICE) - stats[H1_ST_SPLICED_BYTES_IN] = mkf_u64(FN_COUNTER, counters->spliced_bytes_in); - stats[H1_ST_SPLICED_BYTES_OUT] = mkf_u64(FN_COUNTER, counters->spliced_bytes_out); + case H1_ST_SPLICED_BYTES_IN: + metric = mkf_u64(FN_COUNTER, counters->spliced_bytes_in); + break; + case H1_ST_SPLICED_BYTES_OUT: + metric = mkf_u64(FN_COUNTER, counters->spliced_bytes_out); + break; #endif + default: + /* not used for frontends. If a specific metric + * is requested, return an error. Otherwise continue. + */ + if (selected_field != NULL) + return 0; + continue; + } + stats[current_field] = metric; + if (selected_field != NULL) + break; + } + return 1; } static struct stats_module h1_stats_module = { @@ -302,6 +335,8 @@ DECLARE_STATIC_POOL(pool_head_h1s, "h1s", sizeof(struct h1s)); static int h1_recv(struct h1c *h1c); static int h1_send(struct h1c *h1c); static int h1_process(struct h1c *h1c); +static void h1_release(struct h1c *h1c); + /* h1_io_cb is exported to see it resolved in "show fd" */ struct task *h1_io_cb(struct task *t, void *ctx, unsigned int state); struct task *h1_timeout_task(struct task *t, void *context, unsigned int state); @@ -466,45 +501,91 @@ static int h1_buf_available(void *target) { struct h1c *h1c = target; - if ((h1c->flags & H1C_F_IN_ALLOC) && b_alloc(&h1c->ibuf)) { - TRACE_STATE("unblocking h1c, ibuf allocated", H1_EV_H1C_RECV|H1_EV_H1C_BLK|H1_EV_H1C_WAKE, h1c->conn); + if (h1c->flags & H1C_F_IN_ALLOC) { h1c->flags &= ~H1C_F_IN_ALLOC; - if (h1_recv_allowed(h1c)) - tasklet_wakeup(h1c->wait_event.tasklet); - return 1; + h1c->flags |= H1C_F_IN_MAYALLOC; } - if ((h1c->flags & H1C_F_OUT_ALLOC) && b_alloc(&h1c->obuf)) { - TRACE_STATE("unblocking h1s, obuf allocated", H1_EV_TX_DATA|H1_EV_H1S_BLK|H1_EV_STRM_WAKE, h1c->conn, h1c->h1s); + if ((h1c->flags & H1C_F_OUT_ALLOC) && h1c->h1s) { + TRACE_STATE("unblocking h1s, obuf allocatable", H1_EV_TX_DATA|H1_EV_H1S_BLK|H1_EV_STRM_WAKE, h1c->conn, h1c->h1s); h1c->flags &= ~H1C_F_OUT_ALLOC; - if (h1c->h1s) - h1_wake_stream_for_send(h1c->h1s); - return 1; + h1c->flags |= H1C_F_OUT_MAYALLOC; + h1_wake_stream_for_send(h1c->h1s); } - if ((h1c->flags & H1C_F_IN_SALLOC) && h1c->h1s && b_alloc(&h1c->h1s->rxbuf)) { - TRACE_STATE("unblocking h1c, stream rxbuf allocated", H1_EV_H1C_RECV|H1_EV_H1C_BLK|H1_EV_H1C_WAKE, h1c->conn); + if ((h1c->flags & H1C_F_IN_SALLOC) && h1c->h1s) { + TRACE_STATE("unblocking h1c, stream rxbuf allocatable", H1_EV_H1C_RECV|H1_EV_H1C_BLK|H1_EV_H1C_WAKE, h1c->conn); h1c->flags &= ~H1C_F_IN_SALLOC; + h1c->flags |= H1C_F_IN_SMAYALLOC; tasklet_wakeup(h1c->wait_event.tasklet); - return 1; } - return 0; + if ((h1c->flags & H1C_F_IN_MAYALLOC) && h1_recv_allowed(h1c)) { + TRACE_STATE("unblocking h1c, ibuf allocatable", H1_EV_H1C_RECV|H1_EV_H1C_BLK|H1_EV_H1C_WAKE, h1c->conn); + tasklet_wakeup(h1c->wait_event.tasklet); + } + + return 1; +} + +/* + * Allocate the h1c's ibuf. If if fails, it adds the mux in buffer wait queue, + * and sets the H1C_F_IN_ALLOC flag on the connection. It will advertise a more + * urgent allocation when a stream is already present than when none is present + * since in one case a buffer might be needed to permit to release another one, + * while in the other case we've simply not started anything. + */ +static inline struct buffer *h1_get_ibuf(struct h1c *h1c) +{ + struct buffer *buf; + + if (unlikely((buf = b_alloc(&h1c->ibuf, DB_MUX_RX | + ((h1c->flags & H1C_F_IN_MAYALLOC) ? DB_F_NOQUEUE : 0))) == NULL)) { + b_queue(DB_MUX_RX, &h1c->buf_wait, h1c, h1_buf_available); + h1c->flags |= H1C_F_IN_ALLOC; + } + else + h1c->flags &= ~H1C_F_IN_MAYALLOC; + + return buf; } /* - * Allocate a buffer. If if fails, it adds the mux in buffer wait queue. + * Allocate the h1c's obuf. If if fails, it adds the mux in buffer wait queue, + * and sets the H1C_F_OUT_ALLOC flag on the connection. */ -static inline struct buffer *h1_get_buf(struct h1c *h1c, struct buffer *bptr) +static inline struct buffer *h1_get_obuf(struct h1c *h1c) { - struct buffer *buf = NULL; + struct buffer *buf; - if (likely(!LIST_INLIST(&h1c->buf_wait.list)) && - unlikely((buf = b_alloc(bptr)) == NULL)) { - h1c->buf_wait.target = h1c; - h1c->buf_wait.wakeup_cb = h1_buf_available; - LIST_APPEND(&th_ctx->buffer_wq, &h1c->buf_wait.list); + if (unlikely((buf = b_alloc(&h1c->obuf, DB_MUX_TX | + ((h1c->flags & H1C_F_OUT_MAYALLOC) ? DB_F_NOQUEUE : 0))) == NULL)) { + b_queue(DB_MUX_TX, &h1c->buf_wait, h1c, h1_buf_available); + h1c->flags |= H1C_F_OUT_ALLOC; } + else + h1c->flags &= ~H1C_F_OUT_MAYALLOC; + + return buf; +} + +/* + * Allocate the h1s's rxbuf. If if fails, it adds the mux in buffer wait queue, + * and sets the H1C_F_IN_SALLOC flag on the connection. + */ +static inline struct buffer *h1_get_rxbuf(struct h1s *h1s) +{ + struct h1c *h1c = h1s->h1c; + struct buffer *buf; + + if (unlikely((buf = b_alloc(&h1s->rxbuf, DB_SE_RX | + ((h1c->flags & H1C_F_IN_SMAYALLOC) ? DB_F_NOQUEUE : 0))) == NULL)) { + b_queue(DB_SE_RX, &h1c->buf_wait, h1c, h1_buf_available); + h1c->flags |= H1C_F_IN_SALLOC; + } + else + h1c->flags &= ~H1C_F_IN_SMAYALLOC; + return buf; } @@ -521,11 +602,11 @@ static inline void h1_release_buf(struct h1c *h1c, struct buffer *bptr) } /* Returns 1 if the H1 connection is alive (IDLE, EMBRYONIC, RUNNING or - * RUNNING). Ortherwise 0 is returned. + * DRAINING). Ortherwise 0 is returned. */ static inline int h1_is_alive(const struct h1c *h1c) { - return (h1c->state <= H1_CS_RUNNING); + return (h1c->state <= H1_CS_DRAINING); } /* Switch the H1 connection to CLOSING or CLOSED mode, depending on the output @@ -869,7 +950,8 @@ static void h1s_destroy(struct h1s *h1s) h1_release_buf(h1c, &h1s->rxbuf); h1c->flags &= ~(H1C_F_WANT_FASTFWD| - H1C_F_OUT_FULL|H1C_F_OUT_ALLOC|H1C_F_IN_SALLOC| + H1C_F_OUT_FULL|H1C_F_OUT_ALLOC|H1C_F_OUT_MAYALLOC| + H1C_F_IN_SALLOC|H1C_F_IN_SMAYALLOC| H1C_F_CO_MSG_MORE|H1C_F_CO_STREAMER); if (!(h1c->flags & (H1C_F_EOS|H1C_F_ERR_PENDING|H1C_F_ERROR|H1C_F_ABRT_PENDING|H1C_F_ABRTED)) && /* No error/read0/abort */ @@ -893,6 +975,162 @@ static void h1s_destroy(struct h1s *h1s) } } + +/* Check if shutdown performed of an an H1S must lead to a connection shutdown + * of if it can be kept alive. It returns 1 if the connection must be shut down + * and 0 it if can be kept alive. + */ +static int h1s_must_shut_conn(struct h1s *h1s) +{ + struct h1c *h1c = h1s->h1c; + int ret; + + TRACE_ENTER(H1_EV_STRM_SHUT, h1c->conn, h1s); + + if (se_fl_test(h1s->sd, SE_FL_KILL_CONN)) { + TRACE_STATE("stream wants to kill the connection", H1_EV_STRM_SHUT, h1c->conn, h1s); + ret = 1; + } + else if (h1c->state == H1_CS_CLOSING || (h1c->flags & (H1C_F_EOS|H1C_F_ERR_PENDING|H1C_F_ERROR))) { + TRACE_STATE("shutdown on connection (EOS || CLOSING || ERROR)", H1_EV_STRM_SHUT, h1c->conn, h1s); + ret = 1; + } + else if (h1c->state == H1_CS_UPGRADING) { + TRACE_STATE("keep connection alive (UPGRADING)", H1_EV_STRM_SHUT, h1c->conn, h1s); + ret = 0; + } + else if (!(h1c->flags & H1C_F_IS_BACK) && h1s->req.state != H1_MSG_DONE && h1s->res.state == H1_MSG_DONE) { + TRACE_STATE("defer shutdown to drain request first", H1_EV_STRM_SHUT, h1c->conn, h1s); + ret = 0; + } + else if (((h1s->flags & H1S_F_WANT_KAL) && h1s->req.state == H1_MSG_DONE && h1s->res.state == H1_MSG_DONE)) { + TRACE_STATE("keep connection alive (want_kal)", H1_EV_STRM_SHUT, h1c->conn, h1s); + ret = 0; + } + else { + /* The default case, do the shutdown */ + ret = 1; + } + + TRACE_LEAVE(H1_EV_STRM_SHUT, h1c->conn, h1s); + return ret; +} + +/* Really detach the H1S. Most of time of it called from h1_detach() when the + * stream is detached from the connection. But if the request message must be + * drained first, the detach is deferred. + */ +static void h1s_finish_detach(struct h1s *h1s) +{ + struct h1c *h1c; + struct session *sess; + int is_not_first; + + TRACE_ENTER(H1_EV_STRM_END, h1s ? h1s->h1c->conn : NULL, h1s); + + sess = h1s->sess; + h1c = h1s->h1c; + + sess->accept_date = date; + sess->accept_ts = now_ns; + sess->t_handshake = 0; + sess->t_idle = -1; + + is_not_first = h1s->flags & H1S_F_NOT_FIRST; + h1s_destroy(h1s); + + if (h1c->state == H1_CS_IDLE && (h1c->flags & H1C_F_IS_BACK)) { + /* this connection may be killed at any moment, we want it to + * die "cleanly" (i.e. only an RST). + */ + h1c->flags |= H1C_F_SILENT_SHUT; + + /* If there are any excess server data in the input buffer, + * release it and close the connection ASAP (some data may + * remain in the output buffer). This happens if a server sends + * invalid responses. So in such case, we don't want to reuse + * the connection + */ + if (b_data(&h1c->ibuf)) { + h1_release_buf(h1c, &h1c->ibuf); + h1_close(h1c); + TRACE_DEVEL("remaining data on detach, kill connection", H1_EV_STRM_END|H1_EV_H1C_END); + goto release; + } + + if (h1c->conn->flags & CO_FL_PRIVATE) { + /* Add the connection in the session server list, if not already done */ + if (!session_add_conn(sess, h1c->conn, h1c->conn->target)) { + h1c->conn->owner = NULL; + h1c->conn->mux->destroy(h1c); + goto end; + } + /* Always idle at this step */ + + /* mark that the tasklet may lose its context to another thread and + * that the handler needs to check it under the idle conns lock. + */ + HA_ATOMIC_OR(&h1c->wait_event.tasklet->state, TASK_F_USR1); + if (session_check_idle_conn(sess, h1c->conn)) { + /* The connection got destroyed, let's leave */ + TRACE_DEVEL("outgoing connection killed", H1_EV_STRM_END|H1_EV_H1C_END); + goto end; + } + } + else { + if (h1c->conn->owner == sess) + h1c->conn->owner = NULL; + + /* mark that the tasklet may lose its context to another thread and + * that the handler needs to check it under the idle conns lock. + */ + HA_ATOMIC_OR(&h1c->wait_event.tasklet->state, TASK_F_USR1); + h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event); + xprt_set_idle(h1c->conn, h1c->conn->xprt, h1c->conn->xprt_ctx); + + if (!srv_add_to_idle_list(objt_server(h1c->conn->target), h1c->conn, is_not_first)) { + /* The server doesn't want it, let's kill the connection right away */ + h1c->conn->mux->destroy(h1c); + TRACE_DEVEL("outgoing connection killed", H1_EV_STRM_END|H1_EV_H1C_END); + goto end; + } + /* At this point, the connection has been added to the + * server idle list, so another thread may already have + * hijacked it, so we can't do anything with it. + */ + return; + } + } + + release: + /* We don't want to close right now unless the connection is in error or shut down for writes */ + if ((h1c->flags & H1C_F_ERROR) || + (h1c->state == H1_CS_CLOSED) || + (h1c->state == H1_CS_CLOSING && !b_data(&h1c->obuf)) || + !h1c->conn->owner) { + TRACE_DEVEL("killing dead connection", H1_EV_STRM_END, h1c->conn); + h1_release(h1c); + } + else { + if (h1c->state == H1_CS_IDLE) { + /* If we have a new request, process it immediately or + * subscribe for reads waiting for new data + */ + if (unlikely(b_data(&h1c->ibuf))) { + if (h1_process(h1c) == -1) + goto end; + } + else + h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event); + } + h1_set_idle_expiration(h1c); + h1_refresh_timeout(h1c); + } + end: + TRACE_LEAVE(H1_EV_STRM_END); +} + + /* * Initialize the mux once it's attached. It is expected that conn->ctx points * to the existing stream connector (for outgoing connections or for incoming @@ -1049,9 +1287,7 @@ static void h1_release(struct h1c *h1c) } - if (LIST_INLIST(&h1c->buf_wait.list)) - LIST_DEL_INIT(&h1c->buf_wait.list); - + b_dequeue(&h1c->buf_wait); h1_release_buf(h1c, &h1c->ibuf); h1_release_buf(h1c, &h1c->obuf); @@ -1416,21 +1652,33 @@ static void h1_capture_bad_message(struct h1c *h1c, struct h1s *h1s, &ctx, h1_show_error_snapshot); } -/* Emit the chunksize followed by a CRLF in front of data of the buffer +/* Emit the chunk size <chksz> followed by a CRLF in front of data of the buffer * <buf>. It goes backwards and starts with the byte before the buffer's * head. The caller is responsible for ensuring there is enough room left before - * the buffer's head for the string. + * the buffer's head for the string. if <length> is greater than 0, it + * represents the expected total length of the chunk size, including the + * CRLF. So it will be padded with 0 to resepct this length. It is the caller + * responsibility to pass the right value. if <length> is set to 0 (or less that + * the smallest size to represent the chunk size), it is ignored. */ -static void h1_prepend_chunk_size(struct buffer *buf, size_t chksz) +static void h1_prepend_chunk_size(struct buffer *buf, size_t chksz, size_t length) { char *beg, *end; beg = end = b_head(buf); *--beg = '\n'; *--beg = '\r'; + if (length) + length -= 2; do { *--beg = hextab[chksz & 0xF]; + if (length) + --length; } while (chksz >>= 4); + while (length) { + *--beg = '0'; + --length; + } buf->head -= (end - beg); b_add(buf, end - beg); } @@ -2328,15 +2576,47 @@ static size_t h1_make_eoh(struct h1s *h1s, struct h1m *h1m, struct htx *htx, siz b_slow_realign(&h1c->obuf, trash.area, b_data(&h1c->obuf)); outbuf = b_make(b_tail(&h1c->obuf), b_contig_space(&h1c->obuf), 0, 0); + /* Deal with removed "Content-Length" or "Transfer-Encoding" headers during analysis */ + if (((h1m->flags & H1_MF_CLEN) && !(h1s->flags & H1S_F_HAVE_CLEN))|| + ((h1m->flags & H1_MF_CHNK) && !(h1s->flags & H1S_F_HAVE_CHNK))) { + TRACE_STATE("\"Content-Length\" or \"Transfer-Encoding\" header removed during analysis", H1_EV_TX_DATA|H1_EV_TX_HDRS, h1c->conn, h1s); + + if (h1s->flags & (H1S_F_HAVE_CLEN|H1S_F_HAVE_CHNK)) { + /* At least on header is present, we can continue */ + if (!(h1s->flags & H1S_F_HAVE_CLEN)) { + h1m->curr_len = h1m->body_len = 0; + h1m->flags &= ~H1_MF_CLEN; + } + else /* h1s->flags & H1S_F_HAVE_CHNK */ + h1m->flags &= ~(H1_MF_XFER_ENC|H1_MF_CHNK); + } + else { + /* Both headers are missing */ + if (h1m->flags & H1_MF_RESP) { + /* It is a esponse: Switch to unknown xfer length */ + h1m->flags &= ~(H1_MF_XFER_LEN|H1_MF_XFER_ENC|H1_MF_CLEN|H1_MF_CHNK); + h1s->flags &= ~(H1S_F_HAVE_CLEN|H1S_F_HAVE_CHNK); + TRACE_STATE("Switch response to unknown XFER length", H1_EV_TX_DATA|H1_EV_TX_HDRS, h1c->conn, h1s); + } + else { + /* It is the request: Add "Content-Length: 0" header and skip payload */ + struct ist n = ist("content-length"); + if (h1c->px->options2 & (PR_O2_H1_ADJ_BUGCLI|PR_O2_H1_ADJ_BUGSRV)) + h1_adjust_case_outgoing_hdr(h1s, h1m, &n); + if (!h1_format_htx_hdr(n, ist("0"), &outbuf)) + goto full; + + h1m->flags = (h1m->flags & ~(H1_MF_XFER_ENC|H1_MF_CHNK)) | H1_MF_CLEN; + h1s->flags = (h1s->flags & ~H1S_F_HAVE_CHNK) | (H1S_F_HAVE_CLEN|H1S_F_BODYLESS_REQ); + h1m->curr_len = h1m->body_len = 0; + TRACE_STATE("Set request content-length to 0 and skip payload", H1_EV_TX_DATA|H1_EV_TX_HDRS, h1c->conn, h1s); + } + } + } + /* Deal with "Connection" header */ if (!(h1s->flags & H1S_F_HAVE_O_CONN)) { - if ((htx->flags & HTX_FL_PROXY_RESP) && h1s->req.state != H1_MSG_DONE) { - /* If the reply comes from haproxy while the request is - * not finished, we force the connection close. */ - h1s->flags = (h1s->flags & ~H1S_F_WANT_MSK) | H1S_F_WANT_CLO; - TRACE_STATE("force close mode (resp)", H1_EV_TX_DATA|H1_EV_TX_HDRS, h1s->h1c->conn, h1s); - } - else if ((h1m->flags & (H1_MF_XFER_ENC|H1_MF_CLEN)) == (H1_MF_XFER_ENC|H1_MF_CLEN)) { + if ((h1m->flags & (H1_MF_XFER_ENC|H1_MF_CLEN)) == (H1_MF_XFER_ENC|H1_MF_CLEN)) { /* T-E + C-L: force close */ h1s->flags = (h1s->flags & ~H1S_F_WANT_MSK) | H1S_F_WANT_CLO; h1m->flags &= ~H1_MF_CLEN; @@ -2384,23 +2664,6 @@ static size_t h1_make_eoh(struct h1s *h1s, struct h1m *h1m, struct htx *htx, siz h1s->flags |= H1S_F_HAVE_CHNK; } - /* Deal with "Content-Length header */ - if ((h1m->flags & H1_MF_CLEN) && !(h1s->flags & H1S_F_HAVE_CLEN)) { - char *end; - - h1m->curr_len = h1m->body_len = htx->data + htx->extra - sz; - end = DISGUISE(ulltoa(h1m->body_len, trash.area, b_size(&trash))); - - n = ist("content-length"); - v = ist2(trash.area, end-trash.area); - if (h1c->px->options2 & (PR_O2_H1_ADJ_BUGCLI|PR_O2_H1_ADJ_BUGSRV)) - h1_adjust_case_outgoing_hdr(h1s, h1m, &n); - if (!h1_format_htx_hdr(n, v, &outbuf)) - goto full; - TRACE_STATE("add \"Content-Length: <LEN>\"", H1_EV_TX_DATA|H1_EV_TX_HDRS, h1c->conn, h1s); - h1s->flags |= H1S_F_HAVE_CLEN; - } - /* Add the server name to a header (if requested) */ if (!(h1s->flags & H1S_F_HAVE_SRV_NAME) && !(h1m->flags & H1_MF_RESP) && isttest(h1c->px->server_id_hdr_name)) { @@ -2555,7 +2818,8 @@ static size_t h1_make_data(struct h1s *h1s, struct h1m *h1m, struct buffer *buf, * end-to-end. This is the situation that happens all the time with * large files. */ - if ((!(h1m->flags & H1_MF_RESP) || !(h1s->flags & H1S_F_BODYLESS_RESP)) && + if (((!(h1m->flags & H1_MF_RESP) && !(h1s->flags & H1S_F_BODYLESS_REQ)) || + ((h1m->flags & H1_MF_RESP) && !(h1s->flags & H1S_F_BODYLESS_RESP))) && !b_data(&h1c->obuf) && (!(h1m->flags & H1_MF_CHNK) || ((h1m->flags & H1_MF_CHNK) && (!h1m->curr_len || count == h1m->curr_len))) && htx_nbblks(htx) == 1 && @@ -2612,7 +2876,7 @@ static size_t h1_make_data(struct h1s *h1s, struct h1m *h1m, struct buffer *buf, /* Because chunk meta-data are prepended, the chunk size of the current chunk * must be handled before the end of the previous chunk. */ - h1_prepend_chunk_size(&h1c->obuf, h1m->curr_len); + h1_prepend_chunk_size(&h1c->obuf, h1m->curr_len, 0); if (h1m->state == H1_MSG_CHUNK_CRLF) h1_prepend_chunk_crlf(&h1c->obuf); @@ -2682,8 +2946,9 @@ static size_t h1_make_data(struct h1s *h1s, struct h1m *h1m, struct buffer *buf, last_data = 1; } - if ((h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_RESP)) { - TRACE_PROTO("Skip data for bodyless response", H1_EV_TX_DATA|H1_EV_TX_BODY, h1c->conn, h1s, htx); + if ((!(h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_REQ)) || + ((h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_RESP))) { + TRACE_PROTO("Skip data for bodyless message", H1_EV_TX_DATA|H1_EV_TX_BODY, h1c->conn, h1s, htx); goto nextblk; } @@ -2754,7 +3019,8 @@ static size_t h1_make_data(struct h1s *h1s, struct h1m *h1m, struct buffer *buf, } else if (type == HTX_BLK_EOT || type == HTX_BLK_TLR) { - if ((h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_RESP)) { + if ((!(h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_REQ)) || + ((h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_RESP))) { /* Do nothing the payload must be skipped * because it is a bodyless response */ @@ -2954,7 +3220,9 @@ static size_t h1_make_trailers(struct h1s *h1s, struct h1m *h1m, struct htx *htx if (sz > count) goto error; - if (!(h1m->flags & H1_MF_CHNK) || ((h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_RESP))) + if (!(h1m->flags & H1_MF_CHNK) || + (!(h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_REQ)) || + ((h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_RESP))) goto nextblk; n = htx_get_blk_name(htx, blk); @@ -2967,7 +3235,9 @@ static size_t h1_make_trailers(struct h1s *h1s, struct h1m *h1m, struct htx *htx goto full; } else if (type == HTX_BLK_EOT) { - if (!(h1m->flags & H1_MF_CHNK) || ((h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_RESP))) { + if (!(h1m->flags & H1_MF_CHNK) || + (!(h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_REQ)) || + ((h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_RESP))) { TRACE_PROTO((!(h1m->flags & H1_MF_RESP) ? "H1 request trailers skipped" : "H1 response trailers skipped"), H1_EV_TX_DATA|H1_EV_TX_TLRS, h1c->conn, h1s); } @@ -3023,8 +3293,7 @@ static size_t h1_make_chunk(struct h1s *h1s, struct h1m * h1m, size_t len) TRACE_ENTER(H1_EV_TX_DATA|H1_EV_TX_BODY, h1c->conn, h1s); - if (!h1_get_buf(h1c, &h1c->obuf)) { - h1c->flags |= H1C_F_OUT_ALLOC; + if (!h1_get_obuf(h1c)) { TRACE_STATE("waiting for h1c obuf allocation", H1_EV_TX_DATA|H1_EV_H1S_BLK, h1c->conn, h1s); goto end; } @@ -3077,8 +3346,7 @@ static size_t h1_process_mux(struct h1c *h1c, struct buffer *buf, size_t count) if (h1s->flags & (H1S_F_INTERNAL_ERROR|H1S_F_PROCESSING_ERROR|H1S_F_TX_BLK)) goto end; - if (!h1_get_buf(h1c, &h1c->obuf)) { - h1c->flags |= H1C_F_OUT_ALLOC; + if (!h1_get_obuf(h1c)) { TRACE_STATE("waiting for h1c obuf allocation", H1_EV_TX_DATA|H1_EV_H1S_BLK, h1c->conn, h1s); goto end; } @@ -3252,8 +3520,8 @@ static int h1_send_error(struct h1c *h1c) goto out; } - if (!h1_get_buf(h1c, &h1c->obuf)) { - h1c->flags |= (H1C_F_OUT_ALLOC|H1C_F_ABRT_PENDING); + if (!h1_get_obuf(h1c)) { + h1c->flags |= H1C_F_ABRT_PENDING; TRACE_STATE("waiting for h1c obuf allocation", H1_EV_H1C_ERR|H1_EV_H1C_BLK, h1c->conn); goto out; } @@ -3291,6 +3559,11 @@ static int h1_handle_internal_err(struct h1c *h1c) struct session *sess = h1c->conn->owner; int ret = 0; + if (h1c->state == H1_CS_DRAINING) { + h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED; + h1s_destroy(h1c->h1s); + goto end; + } session_inc_http_req_ctr(sess); proxy_inc_fe_req_ctr(sess->listener, sess->fe, 1); _HA_ATOMIC_INC(&sess->fe->fe_counters.p.http.rsp[5]); @@ -3301,6 +3574,7 @@ static int h1_handle_internal_err(struct h1c *h1c) h1c->errcode = 500; ret = h1_send_error(h1c); sess_log(sess); + end: return ret; } @@ -3314,6 +3588,11 @@ static int h1_handle_parsing_error(struct h1c *h1c) struct session *sess = h1c->conn->owner; int ret = 0; + if (h1c->state == H1_CS_DRAINING) { + h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED; + h1s_destroy(h1c->h1s); + goto end; + } if (!b_data(&h1c->ibuf) && ((h1c->flags & H1C_F_WAIT_NEXT_REQ) || (sess->fe->options & PR_O_IGNORE_PRB))) { h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED; h1_close(h1c); @@ -3347,6 +3626,11 @@ static int h1_handle_not_impl_err(struct h1c *h1c) struct session *sess = h1c->conn->owner; int ret = 0; + if (h1c->state == H1_CS_DRAINING) { + h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED; + h1s_destroy(h1c->h1s); + goto end; + } if (!b_data(&h1c->ibuf) && ((h1c->flags & H1C_F_WAIT_NEXT_REQ) || (sess->fe->options & PR_O_IGNORE_PRB))) { h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED; h1_close(h1c); @@ -3377,6 +3661,11 @@ static int h1_handle_req_tout(struct h1c *h1c) struct session *sess = h1c->conn->owner; int ret = 0; + if (h1c->state == H1_CS_DRAINING) { + h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED; + h1s_destroy(h1c->h1s); + goto end; + } if (!b_data(&h1c->ibuf) && ((h1c->flags & H1C_F_WAIT_NEXT_REQ) || (sess->fe->options & PR_O_IGNORE_PRB))) { h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED; h1_close(h1c); @@ -3421,8 +3710,7 @@ static int h1_recv(struct h1c *h1c) return 1; } - if (!h1_get_buf(h1c, &h1c->ibuf)) { - h1c->flags |= H1C_F_IN_ALLOC; + if (!h1_get_ibuf(h1c)) { TRACE_STATE("waiting for h1c ibuf allocation", H1_EV_H1C_RECV|H1_EV_H1C_BLK, h1c->conn); return 0; } @@ -3594,7 +3882,7 @@ static int h1_process(struct h1c * h1c) /* Try to parse now the first block of a request, creating the H1 stream if necessary */ if (b_data(&h1c->ibuf) && /* Input data to be processed */ - (h1c->state < H1_CS_RUNNING) && /* IDLE, EMBRYONIC or UPGRADING */ + ((h1c->state < H1_CS_RUNNING) || (h1c->state == H1_CS_DRAINING)) && /* IDLE, EMBRYONIC, UPGRADING or DRAINING */ !(h1c->flags & (H1C_F_IN_SALLOC|H1C_F_ABRT_PENDING))) { /* No allocation failure on the stream rxbuf and no ERROR on the H1C */ struct h1s *h1s = h1c->h1s; struct buffer *buf; @@ -3605,7 +3893,8 @@ static int h1_process(struct h1c * h1c) goto release; /* First of all handle H1 to H2 upgrade (no need to create the H1 stream) */ - if (!(h1c->flags & H1C_F_WAIT_NEXT_REQ) && /* First request */ + if (h1c->state != H1_CS_DRAINING && /* Not draining message */ + !(h1c->flags & H1C_F_WAIT_NEXT_REQ) && /* First request */ !(h1c->px->options2 & PR_O2_NO_H2_UPGRADE) && /* H2 upgrade supported by the proxy */ !(conn->mux->flags & MX_FL_NO_UPG)) { /* the current mux supports upgrades */ /* Try to match H2 preface before parsing the request headers. */ @@ -3635,9 +3924,8 @@ static int h1_process(struct h1c * h1c) h1s->sess->t_idle = ns_to_ms(now_ns - h1s->sess->accept_ts) - h1s->sess->t_handshake; /* Get the stream rxbuf */ - buf = h1_get_buf(h1c, &h1s->rxbuf); + buf = h1_get_rxbuf(h1s); if (!buf) { - h1c->flags |= H1C_F_IN_SALLOC; TRACE_STATE("waiting for stream rxbuf allocation", H1_EV_H1C_WAKE|H1_EV_H1C_BLK, h1c->conn); return 0; } @@ -3646,7 +3934,7 @@ static int h1_process(struct h1c * h1c) h1_process_demux(h1c, buf, count); h1_release_buf(h1c, &h1s->rxbuf); h1_set_idle_expiration(h1c); - if (h1c->state < H1_CS_RUNNING) { + if (h1c->state != H1_CS_RUNNING) { // TODO: be sure state cannot change in h1_process_demux if (h1s->flags & H1S_F_INTERNAL_ERROR) { h1_handle_internal_err(h1c); TRACE_ERROR("internal error detected", H1_EV_H1C_WAKE|H1_EV_H1C_ERR); @@ -3689,6 +3977,11 @@ static int h1_process(struct h1c * h1c) if (h1_send_error(h1c)) h1_send(h1c); } + else if (h1c->state == H1_CS_DRAINING) { + BUG_ON(h1c->h1s->sd && !se_fl_test(h1c->h1s->sd, SE_FL_ORPHAN)); + h1s_destroy(h1c->h1s); + TRACE_STATE("abort/error when draining message. destroy h1s and close h1c", H1_EV_H1S_END, h1c->conn); + } else { h1_close(h1c); TRACE_STATE("close h1c", H1_EV_H1S_END, h1c->conn); @@ -3717,6 +4010,17 @@ static int h1_process(struct h1c * h1c) h1_alert(h1s); } } + else if (h1c->state == H1_CS_DRAINING) { + BUG_ON(!h1c->h1s); + if (se_fl_test(h1c->h1s->sd, SE_FL_EOI)) { + if (h1s_must_shut_conn(h1c->h1s)) { + h1_shutw_conn(conn); + goto release; + } + h1s_finish_detach(h1c->h1s); + goto end; + } + } if (!b_data(&h1c->ibuf)) h1_release_buf(h1c, &h1c->ibuf); @@ -4025,8 +4329,6 @@ static void h1_detach(struct sedesc *sd) { struct h1s *h1s = sd->se; struct h1c *h1c; - struct session *sess; - int is_not_first; TRACE_ENTER(H1_EV_STRM_END, h1s ? h1s->h1c->conn : NULL, h1s); @@ -4034,149 +4336,47 @@ static void h1_detach(struct sedesc *sd) TRACE_LEAVE(H1_EV_STRM_END); return; } - - sess = h1s->sess; h1c = h1s->h1c; - sess->accept_date = date; - sess->accept_ts = now_ns; - sess->t_handshake = 0; - sess->t_idle = -1; - - is_not_first = h1s->flags & H1S_F_NOT_FIRST; - h1s_destroy(h1s); - - if (h1c->state == H1_CS_IDLE && (h1c->flags & H1C_F_IS_BACK)) { - /* this connection may be killed at any moment, we want it to - * die "cleanly" (i.e. only an RST). + if (h1c->state == H1_CS_RUNNING && !(h1c->flags & H1C_F_IS_BACK) && h1s->req.state != H1_MSG_DONE) { + h1c->state = H1_CS_DRAINING; + TRACE_DEVEL("Deferring H1S destroy to drain message", H1_EV_STRM_END, h1s->h1c->conn, h1s); + /* If we have a pending data, process it immediately or + * subscribe for reads waiting for new data */ - h1c->flags |= H1C_F_SILENT_SHUT; - - /* If there are any excess server data in the input buffer, - * release it and close the connection ASAP (some data may - * remain in the output buffer). This happens if a server sends - * invalid responses. So in such case, we don't want to reuse - * the connection - */ - if (b_data(&h1c->ibuf)) { - h1_release_buf(h1c, &h1c->ibuf); - h1_close(h1c); - TRACE_DEVEL("remaining data on detach, kill connection", H1_EV_STRM_END|H1_EV_H1C_END); - goto release; - } - - if (h1c->conn->flags & CO_FL_PRIVATE) { - /* Add the connection in the session server list, if not already done */ - if (!session_add_conn(sess, h1c->conn, h1c->conn->target)) { - h1c->conn->owner = NULL; - h1c->conn->mux->destroy(h1c); + if (unlikely(b_data(&h1c->ibuf))) { + if (h1_process(h1c) == -1) goto end; - } - /* Always idle at this step */ - if (session_check_idle_conn(sess, h1c->conn)) { - /* The connection got destroyed, let's leave */ - TRACE_DEVEL("outgoing connection killed", H1_EV_STRM_END|H1_EV_H1C_END); - goto end; - } } - else { - if (h1c->conn->owner == sess) - h1c->conn->owner = NULL; - - /* mark that the tasklet may lose its context to another thread and - * that the handler needs to check it under the idle conns lock. - */ - HA_ATOMIC_OR(&h1c->wait_event.tasklet->state, TASK_F_USR1); + else h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event); - xprt_set_idle(h1c->conn, h1c->conn->xprt, h1c->conn->xprt_ctx); - - if (!srv_add_to_idle_list(objt_server(h1c->conn->target), h1c->conn, is_not_first)) { - /* The server doesn't want it, let's kill the connection right away */ - h1c->conn->mux->destroy(h1c); - TRACE_DEVEL("outgoing connection killed", H1_EV_STRM_END|H1_EV_H1C_END); - goto end; - } - /* At this point, the connection has been added to the - * server idle list, so another thread may already have - * hijacked it, so we can't do anything with it. - */ - return; - } - } - - release: - /* We don't want to close right now unless the connection is in error or shut down for writes */ - if ((h1c->flags & H1C_F_ERROR) || - (h1c->state == H1_CS_CLOSED) || - (h1c->state == H1_CS_CLOSING && !b_data(&h1c->obuf)) || - !h1c->conn->owner) { - TRACE_DEVEL("killing dead connection", H1_EV_STRM_END, h1c->conn); - h1_release(h1c); - } - else { - if (h1c->state == H1_CS_IDLE) { - /* If we have a new request, process it immediately or - * subscribe for reads waiting for new data - */ - if (unlikely(b_data(&h1c->ibuf))) { - if (h1_process(h1c) == -1) - goto end; - } - else - h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event); - } h1_set_idle_expiration(h1c); h1_refresh_timeout(h1c); } + else + h1s_finish_detach(h1s); + end: TRACE_LEAVE(H1_EV_STRM_END); } - -static void h1_shutr(struct stconn *sc, enum co_shr_mode mode) +static void h1_shut(struct stconn *sc, enum se_shut_mode mode, struct se_abort_info *reason) { struct h1s *h1s = __sc_mux_strm(sc); struct h1c *h1c; - if (!h1s) - return; - h1c = h1s->h1c; - - TRACE_POINT(H1_EV_STRM_SHUT, h1c->conn, h1s, 0, (size_t[]){mode}); -} - -static void h1_shutw(struct stconn *sc, enum co_shw_mode mode) -{ - struct h1s *h1s = __sc_mux_strm(sc); - struct h1c *h1c; - - if (!h1s) + if (!h1s || !(mode & (SE_SHW_SILENT|SE_SHW_NORMAL))) return; h1c = h1s->h1c; TRACE_ENTER(H1_EV_STRM_SHUT, h1c->conn, h1s, 0, (size_t[]){mode}); - if (se_fl_test(h1s->sd, SE_FL_KILL_CONN)) { - TRACE_STATE("stream wants to kill the connection", H1_EV_STRM_SHUT, h1c->conn, h1s); - goto do_shutw; - } - if (h1c->state == H1_CS_CLOSING || (h1c->flags & (H1C_F_EOS|H1C_F_ERR_PENDING|H1C_F_ERROR))) { - TRACE_STATE("shutdown on connection (EOS || CLOSING || ERROR)", H1_EV_STRM_SHUT, h1c->conn, h1s); - goto do_shutw; - } - - if (h1c->state == H1_CS_UPGRADING) { - TRACE_STATE("keep connection alive (UPGRADING)", H1_EV_STRM_SHUT, h1c->conn, h1s); + if (!h1s_must_shut_conn(h1s)) goto end; - } - if (((h1s->flags & H1S_F_WANT_KAL) && h1s->req.state == H1_MSG_DONE && h1s->res.state == H1_MSG_DONE)) { - TRACE_STATE("keep connection alive (want_kal)", H1_EV_STRM_SHUT, h1c->conn, h1s); - goto end; - } do_shutw: h1_close(h1c); - if (mode != CO_SHW_NORMAL) + if (mode & SE_SHW_NORMAL) h1c->flags |= H1C_F_SILENT_SHUT; if (!b_data(&h1c->obuf)) @@ -4405,12 +4605,12 @@ static inline struct sedesc *h1s_opposite_sd(struct h1s *h1s) return sdo; } -static size_t h1_nego_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice) +static size_t h1_nego_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int flags) { struct h1s *h1s = __sc_mux_strm(sc); struct h1c *h1c = h1s->h1c; struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req); - size_t ret = 0; + size_t sz, offset = 0, ret = 0; TRACE_ENTER(H1_EV_STRM_SEND, h1c->conn, h1s, 0, (size_t[]){count}); @@ -4420,21 +4620,55 @@ static size_t h1_nego_ff(struct stconn *sc, struct buffer *input, size_t count, goto out; } - /* TODO: add check on curr_len if CLEN */ + if ((!(h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_REQ)) || + ((h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_RESP))) { + TRACE_STATE("Bodyless message, disable fastfwd", H1_EV_STRM_SEND|H1_EV_STRM_ERR, h1c->conn, h1s); + h1s->sd->iobuf.flags |= IOBUF_FL_NO_FF; + goto out; + } - if (h1m->flags & H1_MF_CHNK) { + if (h1m->flags & H1_MF_CLEN) { + if ((flags & NEGO_FF_FL_EXACT_SIZE) && count > h1m->curr_len) { + TRACE_ERROR("more payload than announced", H1_EV_STRM_SEND|H1_EV_STRM_ERR, h1c->conn, h1s); + h1s->sd->iobuf.flags |= IOBUF_FL_NO_FF; + goto out; + } + } + else if (h1m->flags & H1_MF_CHNK) { if (h1m->curr_len) { BUG_ON(h1m->state != H1_MSG_DATA); - if (count > h1m->curr_len) + if (count > h1m->curr_len) { + if ((flags & NEGO_FF_FL_EXACT_SIZE) && count > h1m->curr_len) { + TRACE_ERROR("chunk bigger than announced", H1_EV_STRM_SEND|H1_EV_STRM_ERR, h1c->conn, h1s); + h1s->sd->iobuf.flags |= IOBUF_FL_NO_FF; + goto out; + } count = h1m->curr_len; + } } else { BUG_ON(h1m->state != H1_MSG_CHUNK_CRLF && h1m->state != H1_MSG_CHUNK_SIZE); - if (!h1_make_chunk(h1s, h1m, count)) { + if (flags & NEGO_FF_FL_EXACT_SIZE) { + if (!h1_make_chunk(h1s, h1m, count)) { h1s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; - goto out; + goto out; + } + h1m->curr_len = count; + } + else { + /* The producer does not know the chunk size, thus this will be emitted at the + * end, in done_ff(). So splicing cannot be used (see TODO below). + * We will reserve 10 bytes to handle at most 4Go chunk ! + * (<8-bytes SIZE><CRLF><CHUNK-DATA>) + */ + if (count > MAX_RANGE(unsigned int)) + count = MAX_RANGE(unsigned int); + offset = 10; + /* Add 2 more bytes to finish the previous chunk */ + if (h1m->state == H1_MSG_CHUNK_CRLF) + offset += 2; + goto no_splicing; } - h1m->curr_len = count; } } @@ -4445,7 +4679,7 @@ static size_t h1_nego_ff(struct stconn *sc, struct buffer *input, size_t count, * and then data in pipe, or the opposite. For now, it is not * supported to mix data. */ - if (!b_data(input) && !b_data(&h1c->obuf) && may_splice) { + if (!b_data(input) && !b_data(&h1c->obuf) && (flags & NEGO_FF_FL_MAY_SPLICE)) { #if defined(USE_LINUX_SPLICE) if (h1c->conn->xprt->snd_pipe && (h1s->sd->iobuf.pipe || (pipes_used < global.maxpipes && (h1s->sd->iobuf.pipe = get_pipe())))) { h1s->sd->iobuf.offset = 0; @@ -4458,8 +4692,8 @@ static size_t h1_nego_ff(struct stconn *sc, struct buffer *input, size_t count, TRACE_DEVEL("Unable to allocate pipe for splicing, fallback to buffer", H1_EV_STRM_SEND, h1c->conn, h1s); } - if (!h1_get_buf(h1c, &h1c->obuf)) { - h1c->flags |= H1C_F_OUT_ALLOC; + no_splicing: + if (!h1_get_obuf(h1c)) { h1s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; TRACE_STATE("waiting for opposite h1c obuf allocation", H1_EV_STRM_SEND|H1_EV_H1S_BLK, h1c->conn, h1s); goto out; @@ -4468,21 +4702,22 @@ static size_t h1_nego_ff(struct stconn *sc, struct buffer *input, size_t count, if (b_space_wraps(&h1c->obuf)) b_slow_realign(&h1c->obuf, trash.area, b_data(&h1c->obuf)); - h1s->sd->iobuf.buf = &h1c->obuf; - h1s->sd->iobuf.offset = 0; - h1s->sd->iobuf.data = 0; - - /* Cannot forward more than available room in output buffer */ - if (count > b_room(&h1c->obuf)) - count = b_room(&h1c->obuf); - - if (!count) { + if (b_contig_space(&h1c->obuf) <= offset) { h1c->flags |= H1C_F_OUT_FULL; h1s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; TRACE_STATE("output buffer full", H1_EV_STRM_SEND|H1_EV_H1S_BLK, h1c->conn, h1s); goto out; } + /* Cannot forward more than available room in output buffer */ + sz = b_contig_space(&h1c->obuf) - offset; + if (count > sz) + count = sz; + + h1s->sd->iobuf.buf = &h1c->obuf; + h1s->sd->iobuf.offset = offset; + h1s->sd->iobuf.data = 0; + /* forward remaining input data */ if (b_data(input)) { size_t xfer = count; @@ -4529,6 +4764,17 @@ static size_t h1_done_ff(struct stconn *sc) if (b_room(&h1c->obuf) == sd->iobuf.offset) h1c->flags |= H1C_F_OUT_FULL; + if (sd->iobuf.data && sd->iobuf.offset) { + struct buffer buf = b_make(b_orig(&h1c->obuf), b_size(&h1c->obuf), + b_peek_ofs(&h1c->obuf, b_data(&h1c->obuf) - sd->iobuf.data + sd->iobuf.offset), + sd->iobuf.data); + h1_prepend_chunk_size(&buf, sd->iobuf.data, sd->iobuf.offset - ((h1m->state == H1_MSG_CHUNK_CRLF) ? 2 : 0)); + if (h1m->state == H1_MSG_CHUNK_CRLF) + h1_prepend_chunk_crlf(&buf); + b_add(&h1c->obuf, sd->iobuf.offset); + h1m->state = H1_MSG_CHUNK_CRLF; + } + total = sd->iobuf.data; sd->iobuf.buf = NULL; sd->iobuf.offset = 0; @@ -4583,6 +4829,7 @@ static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags) struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->req : &h1s->res); struct sedesc *sdo = NULL; size_t total = 0, try = 0; + unsigned int nego_flags = NEGO_FF_FL_NONE; int ret = 0; TRACE_ENTER(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){count}); @@ -4612,10 +4859,15 @@ static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags) retry: ret = 0; - if (h1m->state == H1_MSG_DATA && (h1m->flags & (H1_MF_CHNK|H1_MF_CLEN)) && count > h1m->curr_len) + if (h1m->state == H1_MSG_DATA && (h1m->flags & (H1_MF_CHNK|H1_MF_CLEN)) && count > h1m->curr_len) { + flags |= NEGO_FF_FL_EXACT_SIZE; count = h1m->curr_len; + } + + if (h1c->conn->xprt->rcv_pipe && !!(flags & CO_RFL_MAY_SPLICE) && !(sdo->iobuf.flags & IOBUF_FL_NO_SPLICING)) + nego_flags |= NEGO_FF_FL_MAY_SPLICE; - try = se_nego_ff(sdo, &h1c->ibuf, count, h1c->conn->xprt->rcv_pipe && !!(flags & CO_RFL_MAY_SPLICE) && !(sdo->iobuf.flags & IOBUF_FL_NO_SPLICING)); + try = se_nego_ff(sdo, &h1c->ibuf, count, nego_flags); if (b_room(&h1c->ibuf) && (h1c->flags & H1C_F_IN_FULL)) { h1c->flags &= ~H1C_F_IN_FULL; TRACE_STATE("h1c ibuf not full anymore", H1_EV_STRM_RECV|H1_EV_H1C_BLK); @@ -4848,6 +5100,10 @@ static int h1_ctl(struct connection *conn, enum mux_ctl_type mux_ctl, void *outp if (!(h1c->wait_event.events & SUB_RETRY_RECV)) h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event); return 0; + case MUX_CTL_GET_NBSTRM: + return h1_used_streams(conn); + case MUX_CTL_GET_MAXSTRM: + return 1; default: return -1; } @@ -5032,25 +5288,35 @@ static int add_hdr_case_adjust(const char *from, const char *to, char **err) * Return 0 if successful, non-zero otherwise. * Expected to be called with the old thread lock held. */ -static int h1_takeover(struct connection *conn, int orig_tid) +static int h1_takeover(struct connection *conn, int orig_tid, int release) { struct h1c *h1c = conn->ctx; struct task *task; - struct task *new_task; - struct tasklet *new_tasklet; + struct task *new_task = NULL; + struct tasklet *new_tasklet = NULL; /* Pre-allocate tasks so that we don't have to roll back after the xprt * has been migrated. */ - new_task = task_new_here(); - new_tasklet = tasklet_new(); - if (!new_task || !new_tasklet) - goto fail; + if (!release) { + /* If the connection is attached to a buffer_wait (extremely + * rare), it will be woken up at any instant by its own thread + * and we can't undo it anyway, so let's give up on this one. + * It's not interesting anyway since it's not usable right now. + */ + if (LIST_INLIST(&h1c->buf_wait.list)) + goto fail; + + new_task = task_new_here(); + new_tasklet = tasklet_new(); + if (!new_task || !new_tasklet) + goto fail; + } if (fd_takeover(conn->handle.fd, conn) != 0) goto fail; - if (conn->xprt->takeover && conn->xprt->takeover(conn, conn->xprt_ctx, orig_tid) != 0) { + if (conn->xprt->takeover && conn->xprt->takeover(conn, conn->xprt_ctx, orig_tid, release) != 0) { /* We failed to takeover the xprt, even if the connection may * still be valid, flag it as error'd, as we have already * taken over the fd, and wake the tasklet, so that it will @@ -5077,8 +5343,10 @@ static int h1_takeover(struct connection *conn, int orig_tid) h1c->task = new_task; new_task = NULL; - h1c->task->process = h1_timeout_task; - h1c->task->context = h1c; + if (!release) { + h1c->task->process = h1_timeout_task; + h1c->task->context = h1c; + } } /* To let the tasklet know it should free itself, and do nothing else, @@ -5088,10 +5356,26 @@ static int h1_takeover(struct connection *conn, int orig_tid) tasklet_wakeup_on(h1c->wait_event.tasklet, orig_tid); h1c->wait_event.tasklet = new_tasklet; - h1c->wait_event.tasklet->process = h1_io_cb; - h1c->wait_event.tasklet->context = h1c; - h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, - SUB_RETRY_RECV, &h1c->wait_event); + if (!release) { + h1c->wait_event.tasklet->process = h1_io_cb; + h1c->wait_event.tasklet->context = h1c; + h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, + SUB_RETRY_RECV, &h1c->wait_event); + } + + if (release) { + /* we're being called for a server deletion and are running + * under thread isolation. That's the only way we can + * unregister a possible subscription of the original + * connection from its owner thread's queue, as this involves + * manipulating thread-unsafe areas. Note that it is not + * possible to just call b_dequeue() here as it would update + * the current thread's bufq_map and not the original one. + */ + BUG_ON(!thread_isolated()); + if (LIST_INLIST(&h1c->buf_wait.list)) + _b_dequeue(&h1c->buf_wait, orig_tid); + } if (new_task) __task_free(new_task); @@ -5321,8 +5605,7 @@ static const struct mux_ops mux_http_ops = { .resume_fastfwd = h1_resume_fastfwd, .subscribe = h1_subscribe, .unsubscribe = h1_unsubscribe, - .shutr = h1_shutr, - .shutw = h1_shutw, + .shut = h1_shut, .show_fd = h1_show_fd, .show_sd = h1_show_sd, .ctl = h1_ctl, @@ -5349,8 +5632,7 @@ static const struct mux_ops mux_h1_ops = { .resume_fastfwd = h1_resume_fastfwd, .subscribe = h1_subscribe, .unsubscribe = h1_unsubscribe, - .shutr = h1_shutr, - .shutw = h1_shutw, + .shut = h1_shut, .show_fd = h1_show_fd, .show_sd = h1_show_sd, .ctl = h1_ctl, |