diff options
Diffstat (limited to 'src/stream.c')
-rw-r--r-- | src/stream.c | 90 |
1 files changed, 39 insertions, 51 deletions
diff --git a/src/stream.c b/src/stream.c index e643a6d..ed5c268 100644 --- a/src/stream.c +++ b/src/stream.c @@ -320,15 +320,13 @@ int stream_buf_available(void *arg) { struct stream *s = arg; - if (!s->req.buf.size && !sc_ep_have_ff_data(s->scb) && s->scf->flags & SC_FL_NEED_BUFF && - b_alloc(&s->req.buf)) + if (!s->req.buf.size && !sc_ep_have_ff_data(s->scb) && s->scf->flags & SC_FL_NEED_BUFF) sc_have_buff(s->scf); - else if (!s->res.buf.size && !sc_ep_have_ff_data(s->scf) && s->scb->flags & SC_FL_NEED_BUFF && - b_alloc(&s->res.buf)) + + if (!s->res.buf.size && !sc_ep_have_ff_data(s->scf) && s->scb->flags & SC_FL_NEED_BUFF) sc_have_buff(s->scb); - else - return 0; + s->flags |= SF_MAYALLOC; task_wakeup(s->task, TASK_WOKEN_RES); return 1; @@ -632,8 +630,7 @@ void stream_free(struct stream *s) } /* We may still be present in the buffer wait queue */ - if (LIST_INLIST(&s->buffer_wait.list)) - LIST_DEL_INIT(&s->buffer_wait.list); + b_dequeue(&s->buffer_wait); if (s->req.buf.size || s->res.buf.size) { int count = !!s->req.buf.size + !!s->res.buf.size; @@ -752,8 +749,12 @@ void stream_free(struct stream *s) */ static int stream_alloc_work_buffer(struct stream *s) { - if (b_alloc(&s->res.buf)) + if (b_alloc(&s->res.buf, DB_CHANNEL | ((s->flags & SF_MAYALLOC) ? DB_F_NOQUEUE : 0))) { + s->flags &= ~SF_MAYALLOC; return 1; + } + + b_requeue(DB_CHANNEL, &s->buffer_wait); return 0; } @@ -920,7 +921,7 @@ void back_establish(struct stream *s) if (!IS_HTX_STRM(s)) { /* let's allow immediate data connection in this case */ /* if the user wants to log as soon as possible, without counting * bytes from the server, then this is the right moment. */ - if (!LIST_ISEMPTY(&strm_fe(s)->logformat) && !(s->logs.logwait & LW_BYTES)) { + if (!lf_expr_isempty(&strm_fe(s)->logformat) && !(s->logs.logwait & LW_BYTES)) { /* note: no pend_pos here, session is established */ s->logs.t_close = s->logs.t_connect; /* to get a valid end date */ s->do_log(s); @@ -1736,8 +1737,8 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) scb = s->scb; /* First, attempt to receive pending data from I/O layers */ - sc_conn_sync_recv(scf); - sc_conn_sync_recv(scb); + sc_sync_recv(scf); + sc_sync_recv(scb); /* Let's check if we're looping without making any progress, e.g. due * to a bogus analyser or the fact that we're ignoring a read0. The @@ -1794,25 +1795,12 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) } resync_stconns: - /* below we may emit error messages so we have to ensure that we have - * our buffers properly allocated. If the allocation failed, an error is - * triggered. - * - * NOTE: An error is returned because the mechanism to queue entities - * waiting for a buffer is totally broken for now. However, this - * part must be refactored. When it will be handled, this part - * must be be reviewed too. - */ if (!stream_alloc_work_buffer(s)) { - scf->flags |= SC_FL_ERROR; - s->conn_err_type = STRM_ET_CONN_RES; - - scb->flags |= SC_FL_ERROR; - s->conn_err_type = STRM_ET_CONN_RES; - - if (!(s->flags & SF_ERR_MASK)) - s->flags |= SF_ERR_RESOURCE; - sess_set_term_flags(s); + scf->flags &= ~SC_FL_DONT_WAKE; + scb->flags &= ~SC_FL_DONT_WAKE; + /* we're stuck for now */ + t->expire = TICK_ETERNITY; + goto leave; } /* 1b: check for low-level errors reported at the stream connector. @@ -2349,7 +2337,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) } /* Let's see if we can send the pending request now */ - sc_conn_sync_send(scb); + sc_sync_send(scb); /* * Now forward all shutdown requests between both sides of the request buffer @@ -2459,7 +2447,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) scf_flags = (scf_flags & ~(SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) | (scf->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)); /* Let's see if we can send the pending response now */ - sc_conn_sync_send(scf); + sc_sync_send(scf); /* * Now forward all shutdown requests between both sides of the buffer @@ -2552,7 +2540,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) stream_handle_timeouts(s); goto resync_stconns; } - + leave: s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES); stream_release_buffers(s); @@ -2597,7 +2585,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) } /* let's do a final log if we need it */ - if (!LIST_ISEMPTY(&sess->fe->logformat) && s->logs.logwait && + if (!lf_expr_isempty(&sess->fe->logformat) && s->logs.logwait && !(s->flags & SF_MONITOR) && (!(sess->fe->options & PR_O_NULLNOLOG) || req->total)) { /* we may need to know the position in the queue */ @@ -2847,7 +2835,7 @@ INITCALL0(STG_INIT, init_stream); * If an ID is already stored within the stream nothing happens existing unique ID is * returned. */ -struct ist stream_generate_unique_id(struct stream *strm, struct list *format) +struct ist stream_generate_unique_id(struct stream *strm, struct lf_expr *format) { if (isttest(strm->unique_id)) { return strm->unique_id; @@ -3494,9 +3482,8 @@ void strm_dump_to_buffer(struct buffer *buf, const struct stream *strm, const ch * buffer is full and it needs to be called again, otherwise non-zero. It is * designed to be called from stats_dump_strm_to_buffer() below. */ -static int stats_dump_full_strm_to_buffer(struct stconn *sc, struct stream *strm) +static int stats_dump_full_strm_to_buffer(struct appctx *appctx, struct stream *strm) { - struct appctx *appctx = __sc_appctx(sc); struct show_sess_ctx *ctx = appctx->svcctx; chunk_reset(&trash); @@ -3588,7 +3575,6 @@ static int cli_parse_show_sess(char **args, char *payload, struct appctx *appctx static int cli_io_handler_dump_sess(struct appctx *appctx) { struct show_sess_ctx *ctx = appctx->svcctx; - struct stconn *sc = appctx_sc(appctx); struct connection *conn; thread_isolate(); @@ -3598,18 +3584,6 @@ static int cli_io_handler_dump_sess(struct appctx *appctx) goto done; } - /* FIXME: Don't watch the other side !*/ - if (unlikely(sc_opposite(sc)->flags & SC_FL_SHUT_DONE)) { - /* If we're forced to shut down, we might have to remove our - * reference to the last stream being dumped. - */ - if (!LIST_ISEMPTY(&ctx->bref.users)) { - LIST_DELETE(&ctx->bref.users); - LIST_INIT(&ctx->bref.users); - } - goto done; - } - chunk_reset(&trash); /* first, let's detach the back-ref from a possible previous stream */ @@ -3666,7 +3640,7 @@ static int cli_io_handler_dump_sess(struct appctx *appctx) LIST_APPEND(&curr_strm->back_refs, &ctx->bref.users); /* call the proper dump() function and return if we're missing space */ - if (!stats_dump_full_strm_to_buffer(sc, curr_strm)) + if (!stats_dump_full_strm_to_buffer(appctx, curr_strm)) goto full; /* stream dump complete */ @@ -4036,6 +4010,19 @@ static int smp_fetch_id32(const struct arg *args, struct sample *smp, const char return 1; } +static int smp_fetch_redispatched(const struct arg *args, struct sample *smp, const char *km, void *private) +{ + smp->flags = SMP_F_VOL_TXN; + smp->data.type = SMP_T_BOOL; + if (!smp->strm) + return 0; + + if (!sc_state_in(smp->strm->scb->state, SC_SB_DIS|SC_SB_CLO)) + smp->flags |= SMP_F_VOL_TEST; + smp->data.u.sint = !!(smp->strm->flags & SF_REDISP); + return 1; +} + /* Note: must not be declared <const> as its list will be overwritten. * Please take care of keeping this list alphabetically sorted. */ @@ -4047,6 +4034,7 @@ static struct sample_fetch_kw_list smp_kws = {ILH, { { "last_rule_line", smp_fetch_last_rule_line, 0, NULL, SMP_T_SINT, SMP_USE_INTRN, }, { "txn.conn_retries", smp_fetch_conn_retries, 0, NULL, SMP_T_SINT, SMP_USE_L4SRV, }, { "txn.id32", smp_fetch_id32, 0, NULL, SMP_T_SINT, SMP_USE_INTRN, }, + { "txn.redispatched", smp_fetch_redispatched, 0, NULL, SMP_T_BOOL, SMP_USE_L4SRV, }, { "txn.sess_term_state",smp_fetch_sess_term_state, 0, NULL, SMP_T_STR, SMP_USE_INTRN, }, { NULL, NULL, 0, 0, 0 }, }}; |