summaryrefslogtreecommitdiffstats
path: root/src/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream.c')
-rw-r--r--src/stream.c90
1 files changed, 39 insertions, 51 deletions
diff --git a/src/stream.c b/src/stream.c
index e643a6d..ed5c268 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -320,15 +320,13 @@ int stream_buf_available(void *arg)
{
struct stream *s = arg;
- if (!s->req.buf.size && !sc_ep_have_ff_data(s->scb) && s->scf->flags & SC_FL_NEED_BUFF &&
- b_alloc(&s->req.buf))
+ if (!s->req.buf.size && !sc_ep_have_ff_data(s->scb) && s->scf->flags & SC_FL_NEED_BUFF)
sc_have_buff(s->scf);
- else if (!s->res.buf.size && !sc_ep_have_ff_data(s->scf) && s->scb->flags & SC_FL_NEED_BUFF &&
- b_alloc(&s->res.buf))
+
+ if (!s->res.buf.size && !sc_ep_have_ff_data(s->scf) && s->scb->flags & SC_FL_NEED_BUFF)
sc_have_buff(s->scb);
- else
- return 0;
+ s->flags |= SF_MAYALLOC;
task_wakeup(s->task, TASK_WOKEN_RES);
return 1;
@@ -632,8 +630,7 @@ void stream_free(struct stream *s)
}
/* We may still be present in the buffer wait queue */
- if (LIST_INLIST(&s->buffer_wait.list))
- LIST_DEL_INIT(&s->buffer_wait.list);
+ b_dequeue(&s->buffer_wait);
if (s->req.buf.size || s->res.buf.size) {
int count = !!s->req.buf.size + !!s->res.buf.size;
@@ -752,8 +749,12 @@ void stream_free(struct stream *s)
*/
static int stream_alloc_work_buffer(struct stream *s)
{
- if (b_alloc(&s->res.buf))
+ if (b_alloc(&s->res.buf, DB_CHANNEL | ((s->flags & SF_MAYALLOC) ? DB_F_NOQUEUE : 0))) {
+ s->flags &= ~SF_MAYALLOC;
return 1;
+ }
+
+ b_requeue(DB_CHANNEL, &s->buffer_wait);
return 0;
}
@@ -920,7 +921,7 @@ void back_establish(struct stream *s)
if (!IS_HTX_STRM(s)) { /* let's allow immediate data connection in this case */
/* if the user wants to log as soon as possible, without counting
* bytes from the server, then this is the right moment. */
- if (!LIST_ISEMPTY(&strm_fe(s)->logformat) && !(s->logs.logwait & LW_BYTES)) {
+ if (!lf_expr_isempty(&strm_fe(s)->logformat) && !(s->logs.logwait & LW_BYTES)) {
/* note: no pend_pos here, session is established */
s->logs.t_close = s->logs.t_connect; /* to get a valid end date */
s->do_log(s);
@@ -1736,8 +1737,8 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
scb = s->scb;
/* First, attempt to receive pending data from I/O layers */
- sc_conn_sync_recv(scf);
- sc_conn_sync_recv(scb);
+ sc_sync_recv(scf);
+ sc_sync_recv(scb);
/* Let's check if we're looping without making any progress, e.g. due
* to a bogus analyser or the fact that we're ignoring a read0. The
@@ -1794,25 +1795,12 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
}
resync_stconns:
- /* below we may emit error messages so we have to ensure that we have
- * our buffers properly allocated. If the allocation failed, an error is
- * triggered.
- *
- * NOTE: An error is returned because the mechanism to queue entities
- * waiting for a buffer is totally broken for now. However, this
- * part must be refactored. When it will be handled, this part
- * must be be reviewed too.
- */
if (!stream_alloc_work_buffer(s)) {
- scf->flags |= SC_FL_ERROR;
- s->conn_err_type = STRM_ET_CONN_RES;
-
- scb->flags |= SC_FL_ERROR;
- s->conn_err_type = STRM_ET_CONN_RES;
-
- if (!(s->flags & SF_ERR_MASK))
- s->flags |= SF_ERR_RESOURCE;
- sess_set_term_flags(s);
+ scf->flags &= ~SC_FL_DONT_WAKE;
+ scb->flags &= ~SC_FL_DONT_WAKE;
+ /* we're stuck for now */
+ t->expire = TICK_ETERNITY;
+ goto leave;
}
/* 1b: check for low-level errors reported at the stream connector.
@@ -2349,7 +2337,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
}
/* Let's see if we can send the pending request now */
- sc_conn_sync_send(scb);
+ sc_sync_send(scb);
/*
* Now forward all shutdown requests between both sides of the request buffer
@@ -2459,7 +2447,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
scf_flags = (scf_flags & ~(SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) | (scf->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED));
/* Let's see if we can send the pending response now */
- sc_conn_sync_send(scf);
+ sc_sync_send(scf);
/*
* Now forward all shutdown requests between both sides of the buffer
@@ -2552,7 +2540,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
stream_handle_timeouts(s);
goto resync_stconns;
}
-
+ leave:
s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
stream_release_buffers(s);
@@ -2597,7 +2585,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
}
/* let's do a final log if we need it */
- if (!LIST_ISEMPTY(&sess->fe->logformat) && s->logs.logwait &&
+ if (!lf_expr_isempty(&sess->fe->logformat) && s->logs.logwait &&
!(s->flags & SF_MONITOR) &&
(!(sess->fe->options & PR_O_NULLNOLOG) || req->total)) {
/* we may need to know the position in the queue */
@@ -2847,7 +2835,7 @@ INITCALL0(STG_INIT, init_stream);
* If an ID is already stored within the stream nothing happens existing unique ID is
* returned.
*/
-struct ist stream_generate_unique_id(struct stream *strm, struct list *format)
+struct ist stream_generate_unique_id(struct stream *strm, struct lf_expr *format)
{
if (isttest(strm->unique_id)) {
return strm->unique_id;
@@ -3494,9 +3482,8 @@ void strm_dump_to_buffer(struct buffer *buf, const struct stream *strm, const ch
* buffer is full and it needs to be called again, otherwise non-zero. It is
* designed to be called from stats_dump_strm_to_buffer() below.
*/
-static int stats_dump_full_strm_to_buffer(struct stconn *sc, struct stream *strm)
+static int stats_dump_full_strm_to_buffer(struct appctx *appctx, struct stream *strm)
{
- struct appctx *appctx = __sc_appctx(sc);
struct show_sess_ctx *ctx = appctx->svcctx;
chunk_reset(&trash);
@@ -3588,7 +3575,6 @@ static int cli_parse_show_sess(char **args, char *payload, struct appctx *appctx
static int cli_io_handler_dump_sess(struct appctx *appctx)
{
struct show_sess_ctx *ctx = appctx->svcctx;
- struct stconn *sc = appctx_sc(appctx);
struct connection *conn;
thread_isolate();
@@ -3598,18 +3584,6 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
goto done;
}
- /* FIXME: Don't watch the other side !*/
- if (unlikely(sc_opposite(sc)->flags & SC_FL_SHUT_DONE)) {
- /* If we're forced to shut down, we might have to remove our
- * reference to the last stream being dumped.
- */
- if (!LIST_ISEMPTY(&ctx->bref.users)) {
- LIST_DELETE(&ctx->bref.users);
- LIST_INIT(&ctx->bref.users);
- }
- goto done;
- }
-
chunk_reset(&trash);
/* first, let's detach the back-ref from a possible previous stream */
@@ -3666,7 +3640,7 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
LIST_APPEND(&curr_strm->back_refs, &ctx->bref.users);
/* call the proper dump() function and return if we're missing space */
- if (!stats_dump_full_strm_to_buffer(sc, curr_strm))
+ if (!stats_dump_full_strm_to_buffer(appctx, curr_strm))
goto full;
/* stream dump complete */
@@ -4036,6 +4010,19 @@ static int smp_fetch_id32(const struct arg *args, struct sample *smp, const char
return 1;
}
+static int smp_fetch_redispatched(const struct arg *args, struct sample *smp, const char *km, void *private)
+{
+ smp->flags = SMP_F_VOL_TXN;
+ smp->data.type = SMP_T_BOOL;
+ if (!smp->strm)
+ return 0;
+
+ if (!sc_state_in(smp->strm->scb->state, SC_SB_DIS|SC_SB_CLO))
+ smp->flags |= SMP_F_VOL_TEST;
+ smp->data.u.sint = !!(smp->strm->flags & SF_REDISP);
+ return 1;
+}
+
/* Note: must not be declared <const> as its list will be overwritten.
* Please take care of keeping this list alphabetically sorted.
*/
@@ -4047,6 +4034,7 @@ static struct sample_fetch_kw_list smp_kws = {ILH, {
{ "last_rule_line", smp_fetch_last_rule_line, 0, NULL, SMP_T_SINT, SMP_USE_INTRN, },
{ "txn.conn_retries", smp_fetch_conn_retries, 0, NULL, SMP_T_SINT, SMP_USE_L4SRV, },
{ "txn.id32", smp_fetch_id32, 0, NULL, SMP_T_SINT, SMP_USE_INTRN, },
+ { "txn.redispatched", smp_fetch_redispatched, 0, NULL, SMP_T_BOOL, SMP_USE_L4SRV, },
{ "txn.sess_term_state",smp_fetch_sess_term_state, 0, NULL, SMP_T_STR, SMP_USE_INTRN, },
{ NULL, NULL, 0, 0, 0 },
}};