diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-03 05:11:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-03 05:11:10 +0000 |
commit | cff6d757e3ba609c08ef2aaa00f07e53551e5bf6 (patch) | |
tree | 08c4fc3255483ad397d712edb4214ded49149fd9 /src/mux_fcgi.c | |
parent | Adding upstream version 2.9.7. (diff) | |
download | haproxy-cff6d757e3ba609c08ef2aaa00f07e53551e5bf6.tar.xz haproxy-cff6d757e3ba609c08ef2aaa00f07e53551e5bf6.zip |
Adding upstream version 3.0.0.upstream/3.0.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/mux_fcgi.c | 108 |
1 files changed, 67 insertions, 41 deletions
diff --git a/src/mux_fcgi.c b/src/mux_fcgi.c index 448d8bb..102a4f0 100644 --- a/src/mux_fcgi.c +++ b/src/mux_fcgi.c @@ -488,14 +488,14 @@ static int fcgi_buf_available(void *target) struct fcgi_conn *fconn = target; struct fcgi_strm *fstrm; - if ((fconn->flags & FCGI_CF_DEM_DALLOC) && b_alloc(&fconn->dbuf)) { + if ((fconn->flags & FCGI_CF_DEM_DALLOC) && b_alloc(&fconn->dbuf, DB_MUX_RX)) { TRACE_STATE("unblocking fconn, dbuf allocated", FCGI_EV_FCONN_RECV|FCGI_EV_FCONN_BLK|FCGI_EV_FCONN_WAKE, fconn->conn); fconn->flags &= ~FCGI_CF_DEM_DALLOC; fcgi_conn_restart_reading(fconn, 1); return 1; } - if ((fconn->flags & FCGI_CF_MUX_MALLOC) && b_alloc(br_tail(fconn->mbuf))) { + if ((fconn->flags & FCGI_CF_MUX_MALLOC) && b_alloc(br_tail(fconn->mbuf), DB_MUX_TX)) { TRACE_STATE("unblocking fconn, mbuf allocated", FCGI_EV_FCONN_SEND|FCGI_EV_FCONN_BLK|FCGI_EV_FCONN_WAKE, fconn->conn); fconn->flags &= ~FCGI_CF_MUX_MALLOC; if (fconn->flags & FCGI_CF_DEM_MROOM) { @@ -507,7 +507,7 @@ static int fcgi_buf_available(void *target) if ((fconn->flags & FCGI_CF_DEM_SALLOC) && (fstrm = fcgi_conn_st_by_id(fconn, fconn->dsi)) && fcgi_strm_sc(fstrm) && - b_alloc(&fstrm->rxbuf)) { + b_alloc(&fstrm->rxbuf, DB_SE_RX)) { TRACE_STATE("unblocking fstrm, rxbuf allocated", FCGI_EV_STRM_RECV|FCGI_EV_FSTRM_BLK|FCGI_EV_STRM_WAKE, fconn->conn, fstrm); fconn->flags &= ~FCGI_CF_DEM_SALLOC; fcgi_conn_restart_reading(fconn, 1); @@ -523,10 +523,8 @@ static inline struct buffer *fcgi_get_buf(struct fcgi_conn *fconn, struct buffer struct buffer *buf = NULL; if (likely(!LIST_INLIST(&fconn->buf_wait.list)) && - unlikely((buf = b_alloc(bptr)) == NULL)) { - fconn->buf_wait.target = fconn; - fconn->buf_wait.wakeup_cb = fcgi_buf_available; - LIST_APPEND(&th_ctx->buffer_wq, &fconn->buf_wait.list); + unlikely((buf = b_alloc(bptr, DB_MUX_RX)) == NULL)) { + b_queue(DB_MUX_RX, &fconn->buf_wait, fconn, fcgi_buf_available); } return buf; } @@ -755,8 +753,7 @@ static void fcgi_release(struct fcgi_conn *fconn) TRACE_POINT(FCGI_EV_FCONN_END); - if (LIST_INLIST(&fconn->buf_wait.list)) - LIST_DEL_INIT(&fconn->buf_wait.list); + b_dequeue(&fconn->buf_wait); fcgi_release_buf(fconn, &fconn->dbuf); fcgi_release_mbuf(fconn); @@ -3089,7 +3086,9 @@ static int fcgi_wake(struct connection *conn) static int fcgi_ctl(struct connection *conn, enum mux_ctl_type mux_ctl, void *output) { + struct fcgi_conn *fconn = conn->ctx; int ret = 0; + switch (mux_ctl) { case MUX_CTL_STATUS: if (!(conn->flags & CO_FL_WAIT_XPRT)) @@ -3097,6 +3096,10 @@ static int fcgi_ctl(struct connection *conn, enum mux_ctl_type mux_ctl, void *ou return ret; case MUX_CTL_EXIT_STATUS: return MUX_ES_UNKNOWN; + case MUX_CTL_GET_NBSTRM: + return fconn->nb_streams; + case MUX_CTL_GET_MAXSTRM: + return fconn->streams_limit; default: return -1; } @@ -3581,6 +3584,10 @@ static void fcgi_detach(struct sedesc *sd) } } if (eb_is_empty(&fconn->streams_by_id)) { + /* mark that the tasklet may lose its context to another thread and + * that the handler needs to check it under the idle conns lock. + */ + HA_ATOMIC_OR(&fconn->wait_event.tasklet->state, TASK_F_USR1); if (session_check_idle_conn(fconn->conn->owner, fconn->conn) != 0) { /* The connection is destroyed, let's leave */ TRACE_DEVEL("outgoing connection killed", FCGI_EV_STRM_END|FCGI_EV_FCONN_ERR); @@ -3619,7 +3626,7 @@ static void fcgi_detach(struct sedesc *sd) } else if (!fconn->conn->hash_node->node.node.leaf_p && fcgi_avail_streams(fconn->conn) > 0 && objt_server(fconn->conn->target) && - !LIST_INLIST(&fconn->conn->session_list)) { + !LIST_INLIST(&fconn->conn->sess_el)) { srv_add_to_avail_list(__objt_server(fconn->conn->target), fconn->conn); } } @@ -3787,24 +3794,16 @@ struct task *fcgi_deferred_shut(struct task *t, void *ctx, unsigned int state) return NULL; } -/* shutr() called by the stream connector (mux_ops.shutr) */ -static void fcgi_shutr(struct stconn *sc, enum co_shr_mode mode) -{ - struct fcgi_strm *fstrm = __sc_mux_strm(sc); - - TRACE_POINT(FCGI_EV_STRM_SHUT, fstrm->fconn->conn, fstrm); - if (!mode) - return; - fcgi_do_shutr(fstrm); -} - -/* shutw() called by the stream connector (mux_ops.shutw) */ -static void fcgi_shutw(struct stconn *sc, enum co_shw_mode mode) +static void fcgi_shut(struct stconn *sc, enum se_shut_mode mode, struct se_abort_info *reason) { struct fcgi_strm *fstrm = __sc_mux_strm(sc); - TRACE_POINT(FCGI_EV_STRM_SHUT, fstrm->fconn->conn, fstrm); - fcgi_do_shutw(fstrm); + TRACE_ENTER(FCGI_EV_STRM_SHUT, fstrm->fconn->conn, fstrm); + if (mode & (SE_SHW_SILENT|SE_SHW_NORMAL)) + fcgi_do_shutw(fstrm); + if (mode & SE_SHR_RESET) + fcgi_do_shutr(fstrm); + TRACE_LEAVE(FCGI_EV_STRM_SHUT, fstrm->fconn->conn, fstrm); } /* Called from the upper layer, to subscribe <es> to events <event_type>. The @@ -4163,25 +4162,35 @@ static int fcgi_show_fd(struct buffer *msg, struct connection *conn) * Return 0 if successful, non-zero otherwise. * Expected to be called with the old thread lock held. */ -static int fcgi_takeover(struct connection *conn, int orig_tid) +static int fcgi_takeover(struct connection *conn, int orig_tid, int release) { struct fcgi_conn *fcgi = conn->ctx; struct task *task; - struct task *new_task; - struct tasklet *new_tasklet; + struct task *new_task = NULL; + struct tasklet *new_tasklet = NULL; /* Pre-allocate tasks so that we don't have to roll back after the xprt * has been migrated. */ - new_task = task_new_here(); - new_tasklet = tasklet_new(); - if (!new_task || !new_tasklet) - goto fail; + if (!release) { + /* If the connection is attached to a buffer_wait (extremely + * rare), it will be woken up at any instant by its own thread + * and we can't undo it anyway, so let's give up on this one. + * It's not interesting anyway since it's not usable right now. + */ + if (LIST_INLIST(&fcgi->buf_wait.list)) + goto fail; + + new_task = task_new_here(); + new_tasklet = tasklet_new(); + if (!new_task || !new_tasklet) + goto fail; + } if (fd_takeover(conn->handle.fd, conn) != 0) goto fail; - if (conn->xprt->takeover && conn->xprt->takeover(conn, conn->xprt_ctx, orig_tid) != 0) { + if (conn->xprt->takeover && conn->xprt->takeover(conn, conn->xprt_ctx, orig_tid, release) != 0) { /* We failed to takeover the xprt, even if the connection may * still be valid, flag it as error'd, as we have already * taken over the fd, and wake the tasklet, so that it will @@ -4208,8 +4217,10 @@ static int fcgi_takeover(struct connection *conn, int orig_tid) fcgi->task = new_task; new_task = NULL; - fcgi->task->process = fcgi_timeout_task; - fcgi->task->context = fcgi; + if (!release) { + fcgi->task->process = fcgi_timeout_task; + fcgi->task->context = fcgi; + } } /* To let the tasklet know it should free itself, and do nothing else, @@ -4219,10 +4230,26 @@ static int fcgi_takeover(struct connection *conn, int orig_tid) tasklet_wakeup_on(fcgi->wait_event.tasklet, orig_tid); fcgi->wait_event.tasklet = new_tasklet; - fcgi->wait_event.tasklet->process = fcgi_io_cb; - fcgi->wait_event.tasklet->context = fcgi; - fcgi->conn->xprt->subscribe(fcgi->conn, fcgi->conn->xprt_ctx, - SUB_RETRY_RECV, &fcgi->wait_event); + if (!release) { + fcgi->wait_event.tasklet->process = fcgi_io_cb; + fcgi->wait_event.tasklet->context = fcgi; + fcgi->conn->xprt->subscribe(fcgi->conn, fcgi->conn->xprt_ctx, + SUB_RETRY_RECV, &fcgi->wait_event); + } + + if (release) { + /* we're being called for a server deletion and are running + * under thread isolation. That's the only way we can + * unregister a possible subscription of the original + * connection from its owner thread's queue, as this involves + * manipulating thread-unsafe areas. Note that it is not + * possible to just call b_dequeue() here as it would update + * the current thread's bufq_map and not the original one. + */ + BUG_ON(!thread_isolated()); + if (LIST_INLIST(&fcgi->buf_wait.list)) + _b_dequeue(&fcgi->buf_wait, orig_tid); + } if (new_task) __task_free(new_task); @@ -4252,8 +4279,7 @@ static const struct mux_ops mux_fcgi_ops = { .snd_buf = fcgi_snd_buf, .subscribe = fcgi_subscribe, .unsubscribe = fcgi_unsubscribe, - .shutr = fcgi_shutr, - .shutw = fcgi_shutw, + .shut = fcgi_shut, .ctl = fcgi_ctl, .sctl = fcgi_sctl, .show_fd = fcgi_show_fd, |