summaryrefslogtreecommitdiffstats
path: root/src/mux_fcgi.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-03 05:11:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-03 05:11:10 +0000
commitcff6d757e3ba609c08ef2aaa00f07e53551e5bf6 (patch)
tree08c4fc3255483ad397d712edb4214ded49149fd9 /src/mux_fcgi.c
parentAdding upstream version 2.9.7. (diff)
downloadhaproxy-cff6d757e3ba609c08ef2aaa00f07e53551e5bf6.tar.xz
haproxy-cff6d757e3ba609c08ef2aaa00f07e53551e5bf6.zip
Adding upstream version 3.0.0.upstream/3.0.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/mux_fcgi.c108
1 files changed, 67 insertions, 41 deletions
diff --git a/src/mux_fcgi.c b/src/mux_fcgi.c
index 448d8bb..102a4f0 100644
--- a/src/mux_fcgi.c
+++ b/src/mux_fcgi.c
@@ -488,14 +488,14 @@ static int fcgi_buf_available(void *target)
struct fcgi_conn *fconn = target;
struct fcgi_strm *fstrm;
- if ((fconn->flags & FCGI_CF_DEM_DALLOC) && b_alloc(&fconn->dbuf)) {
+ if ((fconn->flags & FCGI_CF_DEM_DALLOC) && b_alloc(&fconn->dbuf, DB_MUX_RX)) {
TRACE_STATE("unblocking fconn, dbuf allocated", FCGI_EV_FCONN_RECV|FCGI_EV_FCONN_BLK|FCGI_EV_FCONN_WAKE, fconn->conn);
fconn->flags &= ~FCGI_CF_DEM_DALLOC;
fcgi_conn_restart_reading(fconn, 1);
return 1;
}
- if ((fconn->flags & FCGI_CF_MUX_MALLOC) && b_alloc(br_tail(fconn->mbuf))) {
+ if ((fconn->flags & FCGI_CF_MUX_MALLOC) && b_alloc(br_tail(fconn->mbuf), DB_MUX_TX)) {
TRACE_STATE("unblocking fconn, mbuf allocated", FCGI_EV_FCONN_SEND|FCGI_EV_FCONN_BLK|FCGI_EV_FCONN_WAKE, fconn->conn);
fconn->flags &= ~FCGI_CF_MUX_MALLOC;
if (fconn->flags & FCGI_CF_DEM_MROOM) {
@@ -507,7 +507,7 @@ static int fcgi_buf_available(void *target)
if ((fconn->flags & FCGI_CF_DEM_SALLOC) &&
(fstrm = fcgi_conn_st_by_id(fconn, fconn->dsi)) && fcgi_strm_sc(fstrm) &&
- b_alloc(&fstrm->rxbuf)) {
+ b_alloc(&fstrm->rxbuf, DB_SE_RX)) {
TRACE_STATE("unblocking fstrm, rxbuf allocated", FCGI_EV_STRM_RECV|FCGI_EV_FSTRM_BLK|FCGI_EV_STRM_WAKE, fconn->conn, fstrm);
fconn->flags &= ~FCGI_CF_DEM_SALLOC;
fcgi_conn_restart_reading(fconn, 1);
@@ -523,10 +523,8 @@ static inline struct buffer *fcgi_get_buf(struct fcgi_conn *fconn, struct buffer
struct buffer *buf = NULL;
if (likely(!LIST_INLIST(&fconn->buf_wait.list)) &&
- unlikely((buf = b_alloc(bptr)) == NULL)) {
- fconn->buf_wait.target = fconn;
- fconn->buf_wait.wakeup_cb = fcgi_buf_available;
- LIST_APPEND(&th_ctx->buffer_wq, &fconn->buf_wait.list);
+ unlikely((buf = b_alloc(bptr, DB_MUX_RX)) == NULL)) {
+ b_queue(DB_MUX_RX, &fconn->buf_wait, fconn, fcgi_buf_available);
}
return buf;
}
@@ -755,8 +753,7 @@ static void fcgi_release(struct fcgi_conn *fconn)
TRACE_POINT(FCGI_EV_FCONN_END);
- if (LIST_INLIST(&fconn->buf_wait.list))
- LIST_DEL_INIT(&fconn->buf_wait.list);
+ b_dequeue(&fconn->buf_wait);
fcgi_release_buf(fconn, &fconn->dbuf);
fcgi_release_mbuf(fconn);
@@ -3089,7 +3086,9 @@ static int fcgi_wake(struct connection *conn)
static int fcgi_ctl(struct connection *conn, enum mux_ctl_type mux_ctl, void *output)
{
+ struct fcgi_conn *fconn = conn->ctx;
int ret = 0;
+
switch (mux_ctl) {
case MUX_CTL_STATUS:
if (!(conn->flags & CO_FL_WAIT_XPRT))
@@ -3097,6 +3096,10 @@ static int fcgi_ctl(struct connection *conn, enum mux_ctl_type mux_ctl, void *ou
return ret;
case MUX_CTL_EXIT_STATUS:
return MUX_ES_UNKNOWN;
+ case MUX_CTL_GET_NBSTRM:
+ return fconn->nb_streams;
+ case MUX_CTL_GET_MAXSTRM:
+ return fconn->streams_limit;
default:
return -1;
}
@@ -3581,6 +3584,10 @@ static void fcgi_detach(struct sedesc *sd)
}
}
if (eb_is_empty(&fconn->streams_by_id)) {
+ /* mark that the tasklet may lose its context to another thread and
+ * that the handler needs to check it under the idle conns lock.
+ */
+ HA_ATOMIC_OR(&fconn->wait_event.tasklet->state, TASK_F_USR1);
if (session_check_idle_conn(fconn->conn->owner, fconn->conn) != 0) {
/* The connection is destroyed, let's leave */
TRACE_DEVEL("outgoing connection killed", FCGI_EV_STRM_END|FCGI_EV_FCONN_ERR);
@@ -3619,7 +3626,7 @@ static void fcgi_detach(struct sedesc *sd)
}
else if (!fconn->conn->hash_node->node.node.leaf_p &&
fcgi_avail_streams(fconn->conn) > 0 && objt_server(fconn->conn->target) &&
- !LIST_INLIST(&fconn->conn->session_list)) {
+ !LIST_INLIST(&fconn->conn->sess_el)) {
srv_add_to_avail_list(__objt_server(fconn->conn->target), fconn->conn);
}
}
@@ -3787,24 +3794,16 @@ struct task *fcgi_deferred_shut(struct task *t, void *ctx, unsigned int state)
return NULL;
}
-/* shutr() called by the stream connector (mux_ops.shutr) */
-static void fcgi_shutr(struct stconn *sc, enum co_shr_mode mode)
-{
- struct fcgi_strm *fstrm = __sc_mux_strm(sc);
-
- TRACE_POINT(FCGI_EV_STRM_SHUT, fstrm->fconn->conn, fstrm);
- if (!mode)
- return;
- fcgi_do_shutr(fstrm);
-}
-
-/* shutw() called by the stream connector (mux_ops.shutw) */
-static void fcgi_shutw(struct stconn *sc, enum co_shw_mode mode)
+static void fcgi_shut(struct stconn *sc, enum se_shut_mode mode, struct se_abort_info *reason)
{
struct fcgi_strm *fstrm = __sc_mux_strm(sc);
- TRACE_POINT(FCGI_EV_STRM_SHUT, fstrm->fconn->conn, fstrm);
- fcgi_do_shutw(fstrm);
+ TRACE_ENTER(FCGI_EV_STRM_SHUT, fstrm->fconn->conn, fstrm);
+ if (mode & (SE_SHW_SILENT|SE_SHW_NORMAL))
+ fcgi_do_shutw(fstrm);
+ if (mode & SE_SHR_RESET)
+ fcgi_do_shutr(fstrm);
+ TRACE_LEAVE(FCGI_EV_STRM_SHUT, fstrm->fconn->conn, fstrm);
}
/* Called from the upper layer, to subscribe <es> to events <event_type>. The
@@ -4163,25 +4162,35 @@ static int fcgi_show_fd(struct buffer *msg, struct connection *conn)
* Return 0 if successful, non-zero otherwise.
* Expected to be called with the old thread lock held.
*/
-static int fcgi_takeover(struct connection *conn, int orig_tid)
+static int fcgi_takeover(struct connection *conn, int orig_tid, int release)
{
struct fcgi_conn *fcgi = conn->ctx;
struct task *task;
- struct task *new_task;
- struct tasklet *new_tasklet;
+ struct task *new_task = NULL;
+ struct tasklet *new_tasklet = NULL;
/* Pre-allocate tasks so that we don't have to roll back after the xprt
* has been migrated.
*/
- new_task = task_new_here();
- new_tasklet = tasklet_new();
- if (!new_task || !new_tasklet)
- goto fail;
+ if (!release) {
+ /* If the connection is attached to a buffer_wait (extremely
+ * rare), it will be woken up at any instant by its own thread
+ * and we can't undo it anyway, so let's give up on this one.
+ * It's not interesting anyway since it's not usable right now.
+ */
+ if (LIST_INLIST(&fcgi->buf_wait.list))
+ goto fail;
+
+ new_task = task_new_here();
+ new_tasklet = tasklet_new();
+ if (!new_task || !new_tasklet)
+ goto fail;
+ }
if (fd_takeover(conn->handle.fd, conn) != 0)
goto fail;
- if (conn->xprt->takeover && conn->xprt->takeover(conn, conn->xprt_ctx, orig_tid) != 0) {
+ if (conn->xprt->takeover && conn->xprt->takeover(conn, conn->xprt_ctx, orig_tid, release) != 0) {
/* We failed to takeover the xprt, even if the connection may
* still be valid, flag it as error'd, as we have already
* taken over the fd, and wake the tasklet, so that it will
@@ -4208,8 +4217,10 @@ static int fcgi_takeover(struct connection *conn, int orig_tid)
fcgi->task = new_task;
new_task = NULL;
- fcgi->task->process = fcgi_timeout_task;
- fcgi->task->context = fcgi;
+ if (!release) {
+ fcgi->task->process = fcgi_timeout_task;
+ fcgi->task->context = fcgi;
+ }
}
/* To let the tasklet know it should free itself, and do nothing else,
@@ -4219,10 +4230,26 @@ static int fcgi_takeover(struct connection *conn, int orig_tid)
tasklet_wakeup_on(fcgi->wait_event.tasklet, orig_tid);
fcgi->wait_event.tasklet = new_tasklet;
- fcgi->wait_event.tasklet->process = fcgi_io_cb;
- fcgi->wait_event.tasklet->context = fcgi;
- fcgi->conn->xprt->subscribe(fcgi->conn, fcgi->conn->xprt_ctx,
- SUB_RETRY_RECV, &fcgi->wait_event);
+ if (!release) {
+ fcgi->wait_event.tasklet->process = fcgi_io_cb;
+ fcgi->wait_event.tasklet->context = fcgi;
+ fcgi->conn->xprt->subscribe(fcgi->conn, fcgi->conn->xprt_ctx,
+ SUB_RETRY_RECV, &fcgi->wait_event);
+ }
+
+ if (release) {
+ /* we're being called for a server deletion and are running
+ * under thread isolation. That's the only way we can
+ * unregister a possible subscription of the original
+ * connection from its owner thread's queue, as this involves
+ * manipulating thread-unsafe areas. Note that it is not
+ * possible to just call b_dequeue() here as it would update
+ * the current thread's bufq_map and not the original one.
+ */
+ BUG_ON(!thread_isolated());
+ if (LIST_INLIST(&fcgi->buf_wait.list))
+ _b_dequeue(&fcgi->buf_wait, orig_tid);
+ }
if (new_task)
__task_free(new_task);
@@ -4252,8 +4279,7 @@ static const struct mux_ops mux_fcgi_ops = {
.snd_buf = fcgi_snd_buf,
.subscribe = fcgi_subscribe,
.unsubscribe = fcgi_unsubscribe,
- .shutr = fcgi_shutr,
- .shutw = fcgi_shutw,
+ .shut = fcgi_shut,
.ctl = fcgi_ctl,
.sctl = fcgi_sctl,
.show_fd = fcgi_show_fd,