diff options
Diffstat (limited to 'src/mux_h2.c')
-rw-r--r-- | src/mux_h2.c | 233 |
1 files changed, 167 insertions, 66 deletions
diff --git a/src/mux_h2.c b/src/mux_h2.c index 7ce0e6e..c28c5e1 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -306,7 +306,7 @@ enum { H2_STATS_COUNT /* must be the last member of the enum */ }; -static struct name_desc h2_stats[] = { +static struct stat_col h2_stats[] = { [H2_ST_HEADERS_RCVD] = { .name = "h2_headers_rcvd", .desc = "Total number of received HEADERS frames" }, [H2_ST_DATA_RCVD] = { .name = "h2_data_rcvd", @@ -355,25 +355,67 @@ static struct h2_counters { long long total_streams; /* total number of streams */ } h2_counters; -static void h2_fill_stats(void *data, struct field *stats) +static int h2_fill_stats(void *data, struct field *stats, unsigned int *selected_field) { struct h2_counters *counters = data; + unsigned int current_field = (selected_field != NULL ? *selected_field : 0); - stats[H2_ST_HEADERS_RCVD] = mkf_u64(FN_COUNTER, counters->headers_rcvd); - stats[H2_ST_DATA_RCVD] = mkf_u64(FN_COUNTER, counters->data_rcvd); - stats[H2_ST_SETTINGS_RCVD] = mkf_u64(FN_COUNTER, counters->settings_rcvd); - stats[H2_ST_RST_STREAM_RCVD] = mkf_u64(FN_COUNTER, counters->rst_stream_rcvd); - stats[H2_ST_GOAWAY_RCVD] = mkf_u64(FN_COUNTER, counters->goaway_rcvd); - - stats[H2_ST_CONN_PROTO_ERR] = mkf_u64(FN_COUNTER, counters->conn_proto_err); - stats[H2_ST_STRM_PROTO_ERR] = mkf_u64(FN_COUNTER, counters->strm_proto_err); - stats[H2_ST_RST_STREAM_RESP] = mkf_u64(FN_COUNTER, counters->rst_stream_resp); - stats[H2_ST_GOAWAY_RESP] = mkf_u64(FN_COUNTER, counters->goaway_resp); - - stats[H2_ST_OPEN_CONN] = mkf_u64(FN_GAUGE, counters->open_conns); - stats[H2_ST_OPEN_STREAM] = mkf_u64(FN_GAUGE, counters->open_streams); - stats[H2_ST_TOTAL_CONN] = mkf_u64(FN_COUNTER, counters->total_conns); - stats[H2_ST_TOTAL_STREAM] = mkf_u64(FN_COUNTER, counters->total_streams); + for (; current_field < H2_STATS_COUNT; current_field++) { + struct field metric = { 0 }; + + switch (current_field) { + case H2_ST_HEADERS_RCVD: + metric = mkf_u64(FN_COUNTER, counters->headers_rcvd); + break; + case H2_ST_DATA_RCVD: + metric = mkf_u64(FN_COUNTER, counters->data_rcvd); + break; + case H2_ST_SETTINGS_RCVD: + metric = mkf_u64(FN_COUNTER, counters->settings_rcvd); + break; + case H2_ST_RST_STREAM_RCVD: + metric = mkf_u64(FN_COUNTER, counters->rst_stream_rcvd); + break; + case H2_ST_GOAWAY_RCVD: + metric = mkf_u64(FN_COUNTER, counters->goaway_rcvd); + break; + case H2_ST_CONN_PROTO_ERR: + metric = mkf_u64(FN_COUNTER, counters->conn_proto_err); + break; + case H2_ST_STRM_PROTO_ERR: + metric = mkf_u64(FN_COUNTER, counters->strm_proto_err); + break; + case H2_ST_RST_STREAM_RESP: + metric = mkf_u64(FN_COUNTER, counters->rst_stream_resp); + break; + case H2_ST_GOAWAY_RESP: + metric = mkf_u64(FN_COUNTER, counters->goaway_resp); + break; + case H2_ST_OPEN_CONN: + metric = mkf_u64(FN_GAUGE, counters->open_conns); + break; + case H2_ST_OPEN_STREAM: + metric = mkf_u64(FN_GAUGE, counters->open_streams); + break; + case H2_ST_TOTAL_CONN: + metric = mkf_u64(FN_COUNTER, counters->total_conns); + break; + case H2_ST_TOTAL_STREAM: + metric = mkf_u64(FN_COUNTER, counters->total_streams); + break; + default: + /* not used for frontends. If a specific metric + * is requested, return an error. Otherwise continue. + */ + if (selected_field != NULL) + return 0; + continue; + } + stats[current_field] = metric; + if (selected_field != NULL) + break; + } + return 1; } static struct stats_module h2_stats_module = { @@ -770,13 +812,13 @@ static int h2_buf_available(void *target) struct h2c *h2c = target; struct h2s *h2s; - if ((h2c->flags & H2_CF_DEM_DALLOC) && b_alloc(&h2c->dbuf)) { + if ((h2c->flags & H2_CF_DEM_DALLOC) && b_alloc(&h2c->dbuf, DB_MUX_RX)) { h2c->flags &= ~H2_CF_DEM_DALLOC; h2c_restart_reading(h2c, 1); return 1; } - if ((h2c->flags & H2_CF_MUX_MALLOC) && b_alloc(br_tail(h2c->mbuf))) { + if ((h2c->flags & H2_CF_MUX_MALLOC) && b_alloc(br_tail(h2c->mbuf), DB_MUX_TX)) { h2c->flags &= ~H2_CF_MUX_MALLOC; if (h2c->flags & H2_CF_DEM_MROOM) { @@ -788,7 +830,7 @@ static int h2_buf_available(void *target) if ((h2c->flags & H2_CF_DEM_SALLOC) && (h2s = h2c_st_by_id(h2c, h2c->dsi)) && h2s_sc(h2s) && - b_alloc(&h2s->rxbuf)) { + b_alloc(&h2s->rxbuf, DB_SE_RX)) { h2c->flags &= ~H2_CF_DEM_SALLOC; h2c_restart_reading(h2c, 1); return 1; @@ -802,10 +844,8 @@ static inline struct buffer *h2_get_buf(struct h2c *h2c, struct buffer *bptr) struct buffer *buf = NULL; if (likely(!LIST_INLIST(&h2c->buf_wait.list)) && - unlikely((buf = b_alloc(bptr)) == NULL)) { - h2c->buf_wait.target = h2c; - h2c->buf_wait.wakeup_cb = h2_buf_available; - LIST_APPEND(&th_ctx->buffer_wq, &h2c->buf_wait.list); + unlikely((buf = b_alloc(bptr, DB_MUX_RX)) == NULL)) { + b_queue(DB_MUX_RX, &h2c->buf_wait, h2c, h2_buf_available); } return buf; } @@ -1153,8 +1193,7 @@ static void h2_release(struct h2c *h2c) hpack_dht_free(h2c->ddht); - if (LIST_INLIST(&h2c->buf_wait.list)) - LIST_DEL_INIT(&h2c->buf_wait.list); + b_dequeue(&h2c->buf_wait); h2_release_buf(h2c, &h2c->dbuf); h2_release_mbuf(h2c); @@ -1222,6 +1261,20 @@ static inline int h2s_mws(const struct h2s *h2s) return h2s->sws + h2s->h2c->miw; } +/* Returns 1 if the H2 error of the opposite side is forwardable to the peer. + * Otherwise 0 is returned. + * For now, only CANCEL from the client is forwardable to the server. + */ +static inline int h2s_is_forwardable_abort(struct h2s *h2s, struct se_abort_info *reason) +{ + enum h2_err err = H2_ERR_NO_ERROR; + + if (reason && ((reason->info & SE_ABRT_SRC_MASK) >> SE_ABRT_SRC_SHIFT) == SE_ABRT_SRC_MUX_H2) + err = reason->code; + + return ((h2s->h2c->flags & H2_CF_IS_BACK) && (err == H2_ERR_CANCEL)); +} + /* marks an error on the connection. Before settings are sent, we must not send * a GOAWAY frame, and the error state will prevent h2c_send_goaway_error() * from verifying this so we set H2_CF_GOAWAY_FAILED to make sure it will not @@ -2770,6 +2823,10 @@ static int h2c_handle_rst_stream(struct h2c *h2c, struct h2s *h2s) if (h2s_sc(h2s)) { se_fl_set_error(h2s->sd); + if (!h2s->sd->abort_info.info) { + h2s->sd->abort_info.info = (SE_ABRT_SRC_MUX_H2 << SE_ABRT_SRC_SHIFT); + h2s->sd->abort_info.code = h2s->errcode; + } h2s_alert(h2s); } @@ -4344,8 +4401,13 @@ static int h2_process(struct h2c *h2c) if (!(h2c->flags & H2_CF_DEM_BLOCK_ANY) && (b_data(&h2c->dbuf) || (h2c->flags & H2_CF_RCVD_SHUT))) { + int prev_glitches = h2c->glitches; + h2_process_demux(h2c); + if (h2c->glitches != prev_glitches && !(h2c->flags & H2_CF_IS_BACK)) + session_add_glitch_ctr(h2c->conn->owner, h2c->glitches - prev_glitches); + if (h2c->st0 >= H2_CS_ERROR || (h2c->flags & H2_CF_ERROR)) b_reset(&h2c->dbuf); @@ -4664,6 +4726,12 @@ static int h2_ctl(struct connection *conn, enum mux_ctl_type mux_ctl, void *outp case MUX_CTL_GET_GLITCHES: return h2c->glitches; + case MUX_CTL_GET_NBSTRM: + return h2c->nb_streams; + + case MUX_CTL_GET_MAXSTRM: + return h2c->streams_limit; + default: return -1; } @@ -4772,6 +4840,10 @@ static void h2_detach(struct sedesc *sd) } } if (eb_is_empty(&h2c->streams_by_id)) { + /* mark that the tasklet may lose its context to another thread and + * that the handler needs to check it under the idle conns lock. + */ + HA_ATOMIC_OR(&h2c->wait_event.tasklet->state, TASK_F_USR1); if (session_check_idle_conn(h2c->conn->owner, h2c->conn) != 0) { /* At this point either the connection is destroyed, or it's been added to the server idle list, just stop */ TRACE_DEVEL("leaving without reusable idle connection", H2_EV_STRM_END); @@ -4811,7 +4883,7 @@ static void h2_detach(struct sedesc *sd) } else if (!h2c->conn->hash_node->node.node.leaf_p && h2_avail_streams(h2c->conn) > 0 && objt_server(h2c->conn->target) && - !LIST_INLIST(&h2c->conn->session_list)) { + !LIST_INLIST(&h2c->conn->sess_el)) { srv_add_to_avail_list(__objt_server(h2c->conn->target), h2c->conn); } } @@ -4837,7 +4909,7 @@ static void h2_detach(struct sedesc *sd) } /* Performs a synchronous or asynchronous shutr(). */ -static void h2_do_shutr(struct h2s *h2s) +static void h2_do_shutr(struct h2s *h2s, struct se_abort_info *reason) { struct h2c *h2c = h2s->h2c; @@ -4860,6 +4932,10 @@ static void h2_do_shutr(struct h2s *h2s) h2c_error(h2c, H2_ERR_ENHANCE_YOUR_CALM); h2s_error(h2s, H2_ERR_ENHANCE_YOUR_CALM); } + else if (h2s_is_forwardable_abort(h2s, reason)) { + TRACE_STATE("shutr using opposite endp code", H2_EV_STRM_SHUT, h2c->conn, h2s); + h2s_error(h2s, reason->code); + } else if (!(h2s->flags & H2_SF_HEADERS_SENT)) { /* Nothing was never sent for this stream, so reset with * REFUSED_STREAM error to let the client retry the @@ -4905,8 +4981,9 @@ add_to_list: return; } + /* Performs a synchronous or asynchronous shutw(). */ -static void h2_do_shutw(struct h2s *h2s) +static void h2_do_shutw(struct h2s *h2s, struct se_abort_info *reason) { struct h2c *h2c = h2s->h2c; @@ -4916,6 +4993,7 @@ static void h2_do_shutw(struct h2s *h2s) TRACE_ENTER(H2_EV_STRM_SHUT, h2c->conn, h2s); if (h2s->st != H2_SS_ERROR && + !h2s_is_forwardable_abort(h2s, reason) && (h2s->flags & (H2_SF_HEADERS_SENT | H2_SF_MORE_HTX_DATA)) == H2_SF_HEADERS_SENT) { /* we can cleanly close using an empty data frame only after headers * and if no more data is expected to be sent. @@ -4940,6 +5018,10 @@ static void h2_do_shutw(struct h2s *h2s) h2c_error(h2c, H2_ERR_ENHANCE_YOUR_CALM); h2s_error(h2s, H2_ERR_ENHANCE_YOUR_CALM); } + else if (h2s_is_forwardable_abort(h2s, reason)) { + TRACE_STATE("shutw using opposite endp code", H2_EV_STRM_SHUT, h2c->conn, h2s); + h2s_error(h2s, reason->code); + } else if (h2s->flags & H2_SF_MORE_HTX_DATA) { /* some unsent data were pending (e.g. abort during an upload), * let's send a CANCEL. @@ -5006,10 +5088,10 @@ struct task *h2_deferred_shut(struct task *t, void *ctx, unsigned int state) } if (h2s->flags & H2_SF_WANT_SHUTW) - h2_do_shutw(h2s); + h2_do_shutw(h2s, NULL); if (h2s->flags & H2_SF_WANT_SHUTR) - h2_do_shutr(h2s); + h2_do_shutr(h2s, NULL); if (!(h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW))) { /* We're done trying to send, remove ourself from the send_list */ @@ -5028,24 +5110,17 @@ struct task *h2_deferred_shut(struct task *t, void *ctx, unsigned int state) return t; } -/* shutr() called by the stream connector (mux_ops.shutr) */ -static void h2_shutr(struct stconn *sc, enum co_shr_mode mode) -{ - struct h2s *h2s = __sc_mux_strm(sc); - - TRACE_ENTER(H2_EV_STRM_SHUT, h2s->h2c->conn, h2s); - if (mode) - h2_do_shutr(h2s); - TRACE_LEAVE(H2_EV_STRM_SHUT, h2s->h2c->conn, h2s); -} - -/* shutw() called by the stream connector (mux_ops.shutw) */ -static void h2_shutw(struct stconn *sc, enum co_shw_mode mode) +static void h2_shut(struct stconn *sc, enum se_shut_mode mode, struct se_abort_info *reason) { struct h2s *h2s = __sc_mux_strm(sc); TRACE_ENTER(H2_EV_STRM_SHUT, h2s->h2c->conn, h2s); - h2_do_shutw(h2s); + if (mode & (SE_SHW_SILENT|SE_SHW_NORMAL)) { + /* Pass the reason for silent shutw only (abort) */ + h2_do_shutw(h2s, (mode & SE_SHW_SILENT) ? reason : NULL); + } + if (mode & SE_SHR_RESET) + h2_do_shutr(h2s, reason); TRACE_LEAVE(H2_EV_STRM_SHUT, h2s->h2c->conn, h2s); } @@ -6197,10 +6272,9 @@ static size_t h2s_snd_bhdrs(struct h2s *h2s, struct htx *htx) } /* Try to send a DATA frame matching HTTP response present in HTX structure - * present in <buf>, for stream <h2s>. Returns the number of bytes sent. The - * caller must check the stream's status to detect any error which might have - * happened subsequently to a successful send. Returns the number of data bytes - * consumed, or zero if nothing done. + * present in <buf>, for stream <h2s>. The caller must check the stream's status + * to detect any error which might have happened subsequently to a successful + * send. Returns the number of data bytes consumed, or zero if nothing done. */ static size_t h2s_make_data(struct h2s *h2s, struct buffer *buf, size_t count) { @@ -7095,7 +7169,7 @@ static size_t h2_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, in return total; } -static size_t h2_nego_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice) +static size_t h2_nego_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int flags) { struct h2s *h2s = __sc_mux_strm(sc); struct h2c *h2c = h2s->h2c; @@ -7454,25 +7528,35 @@ static int h2_show_sd(struct buffer *msg, struct sedesc *sd, const char *pfx) * Return 0 if successful, non-zero otherwise. * Expected to be called with the old thread lock held. */ -static int h2_takeover(struct connection *conn, int orig_tid) +static int h2_takeover(struct connection *conn, int orig_tid, int release) { struct h2c *h2c = conn->ctx; struct task *task; - struct task *new_task; - struct tasklet *new_tasklet; + struct task *new_task = NULL; + struct tasklet *new_tasklet = NULL; /* Pre-allocate tasks so that we don't have to roll back after the xprt * has been migrated. */ - new_task = task_new_here(); - new_tasklet = tasklet_new(); - if (!new_task || !new_tasklet) - goto fail; + if (!release) { + /* If the connection is attached to a buffer_wait (extremely + * rare), it will be woken up at any instant by its own thread + * and we can't undo it anyway, so let's give up on this one. + * It's not interesting anyway since it's not usable right now. + */ + if (LIST_INLIST(&h2c->buf_wait.list)) + goto fail; + + new_task = task_new_here(); + new_tasklet = tasklet_new(); + if (!new_task || !new_tasklet) + goto fail; + } if (fd_takeover(conn->handle.fd, conn) != 0) goto fail; - if (conn->xprt->takeover && conn->xprt->takeover(conn, conn->xprt_ctx, orig_tid) != 0) { + if (conn->xprt->takeover && conn->xprt->takeover(conn, conn->xprt_ctx, orig_tid, release) != 0) { /* We failed to takeover the xprt, even if the connection may * still be valid, flag it as error'd, as we have already * taken over the fd, and wake the tasklet, so that it will @@ -7499,8 +7583,10 @@ static int h2_takeover(struct connection *conn, int orig_tid) h2c->task = new_task; new_task = NULL; - h2c->task->process = h2_timeout_task; - h2c->task->context = h2c; + if (!release) { + h2c->task->process = h2_timeout_task; + h2c->task->context = h2c; + } } /* To let the tasklet know it should free itself, and do nothing else, @@ -7510,10 +7596,26 @@ static int h2_takeover(struct connection *conn, int orig_tid) tasklet_wakeup_on(h2c->wait_event.tasklet, orig_tid); h2c->wait_event.tasklet = new_tasklet; - h2c->wait_event.tasklet->process = h2_io_cb; - h2c->wait_event.tasklet->context = h2c; - h2c->conn->xprt->subscribe(h2c->conn, h2c->conn->xprt_ctx, - SUB_RETRY_RECV, &h2c->wait_event); + if (!release) { + h2c->wait_event.tasklet->process = h2_io_cb; + h2c->wait_event.tasklet->context = h2c; + h2c->conn->xprt->subscribe(h2c->conn, h2c->conn->xprt_ctx, + SUB_RETRY_RECV, &h2c->wait_event); + } + + if (release) { + /* we're being called for a server deletion and are running + * under thread isolation. That's the only way we can + * unregister a possible subscription of the original + * connection from its owner thread's queue, as this involves + * manipulating thread-unsafe areas. Note that it is not + * possible to just call b_dequeue() here as it would update + * the current thread's bufq_map and not the original one. + */ + BUG_ON(!thread_isolated()); + if (LIST_INLIST(&h2c->buf_wait.list)) + _b_dequeue(&h2c->buf_wait, orig_tid); + } if (new_task) __task_free(new_task); @@ -7690,8 +7792,7 @@ static const struct mux_ops h2_ops = { .destroy = h2_destroy, .avail_streams = h2_avail_streams, .used_streams = h2_used_streams, - .shutr = h2_shutr, - .shutw = h2_shutw, + .shut = h2_shut, .ctl = h2_ctl, .sctl = h2_sctl, .show_fd = h2_show_fd, |