summaryrefslogtreecommitdiffstats
path: root/include/haproxy/stconn.h
blob: 7869fa340625ac04c355650d479e83b85868e888 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
/*
 * include/haproxy/stconn.h
 * This file contains stream connector function prototypes
 *
 * Copyright 2021 Christopher Faulet <cfaulet@haproxy.com>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation, version 2.1
 * exclusively.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 */

#ifndef _HAPROXY_STCONN_H
#define _HAPROXY_STCONN_H

#include <haproxy/api.h>
#include <haproxy/connection.h>
#include <haproxy/htx-t.h>
#include <haproxy/obj_type.h>
#include <haproxy/stconn-t.h>

struct buffer;
struct session;
struct appctx;
struct stream;
struct check;

#define IS_HTX_SC(sc)     (sc_conn(sc) && IS_HTX_CONN(__sc_conn(sc)))

struct sedesc *sedesc_new();
void sedesc_free(struct sedesc *sedesc);

struct stconn *sc_new_from_endp(struct sedesc *sedesc, struct session *sess, struct buffer *input);
struct stconn *sc_new_from_strm(struct stream *strm, unsigned int flags);
struct stconn *sc_new_from_check(struct check *check, unsigned int flags);
void sc_free(struct stconn *sc);

int sc_attach_mux(struct stconn *sc, void *target, void *ctx);
int sc_attach_strm(struct stconn *sc, struct stream *strm);

void sc_destroy(struct stconn *sc);
int sc_reset_endp(struct stconn *sc);

struct appctx *sc_applet_create(struct stconn *sc, struct applet *app);

void sc_conn_prepare_endp_upgrade(struct stconn *sc);
void sc_conn_abort_endp_upgrade(struct stconn *sc);
void sc_conn_commit_endp_upgrade(struct stconn *sc);

/* The se_fl_*() set of functions manipulate the stream endpoint flags from
 * the stream endpoint itself. The sc_ep_*() set of functions manipulate the
 * stream endpoint flags from the the stream connector (ex. stconn).
 * _zero() clears all flags, _clr() clears a set of flags (&=~), _set() sets
 * a set of flags (|=), _test() tests the presence of a set of flags, _get()
 * retrieves the exact flags, _setall() replaces the flags with the new value.
 * All functions are purposely marked "forceinline" to avoid slowing down
 * debugging code too much. None of these functions is atomic-safe.
 */

/* stream endpoint version */
static forceinline void se_fl_zero(struct sedesc *se)
{
	se->flags = 0;
}

static forceinline void se_fl_setall(struct sedesc *se, uint all)
{
	se->flags = all;
}

/* sets flags <on> on se->flags and handles ERR_PENDING to ERROR promotion if
 * needed (upon EOI/EOS).
 */
static forceinline void se_fl_set(struct sedesc *se, uint on)
{
	if (((on & (SE_FL_EOS|SE_FL_EOI)) && se->flags & SE_FL_ERR_PENDING) ||
	    ((on & SE_FL_ERR_PENDING) && se->flags & (SE_FL_EOI|SE_FL_EOS)))
		on |= SE_FL_ERROR;
	se->flags |= on;
}

static forceinline void se_fl_clr(struct sedesc *se, uint off)
{
	se->flags &= ~off;
}

static forceinline uint se_fl_test(const struct sedesc *se, uint test)
{
	return !!(se->flags & test);
}

static forceinline uint se_fl_get(const struct sedesc *se)
{
	return se->flags;
}

/* sets SE_FL_ERROR or SE_FL_ERR_PENDING on the endpoint */
static inline void se_fl_set_error(struct sedesc *se)
{
	if (se_fl_test(se, (SE_FL_EOS|SE_FL_EOI)))
		se_fl_set(se, SE_FL_ERROR);
	else
		se_fl_set(se, SE_FL_ERR_PENDING);
}

static inline void se_expect_no_data(struct sedesc *se)
{
	se_fl_set(se, SE_FL_EXP_NO_DATA);
}

static inline void se_expect_data(struct sedesc *se)
{
	se_fl_clr(se, SE_FL_EXP_NO_DATA);
}

static inline unsigned int se_have_ff_data(struct sedesc *se)
{
	return (se->iobuf.data | (long)se->iobuf.pipe);
}

static inline size_t se_ff_data(struct sedesc *se)
{
	return (se->iobuf.data + (se->iobuf.pipe ? se->iobuf.pipe->data : 0));
}

/* stream connector version */
static forceinline void sc_ep_zero(struct stconn *sc)
{
	se_fl_zero(sc->sedesc);
}

static forceinline void sc_ep_setall(struct stconn *sc, uint all)
{
	se_fl_setall(sc->sedesc, all);
}

static forceinline void sc_ep_set(struct stconn *sc, uint on)
{
	se_fl_set(sc->sedesc, on);
}

static forceinline void sc_ep_clr(struct stconn *sc, uint off)
{
	se_fl_clr(sc->sedesc, off);
}

static forceinline uint sc_ep_test(const struct stconn *sc, uint test)
{
	return se_fl_test(sc->sedesc, test);
}

static forceinline uint sc_ep_get(const struct stconn *sc)
{
	return se_fl_get(sc->sedesc);
}

/* Return the last read activity timestamp. May be TICK_ETERNITY */
static forceinline unsigned int sc_ep_lra(const struct stconn *sc)
{
	return sc->sedesc->lra;
}

/* Return the first send blocked timestamp. May be TICK_ETERNITY */
static forceinline unsigned int sc_ep_fsb(const struct stconn *sc)
{
	return sc->sedesc->fsb;
}

/* Report a read activity. This function sets <lra> to now_ms */
static forceinline void sc_ep_report_read_activity(struct stconn *sc)
{
	sc->sedesc->lra = now_ms;
}

/* Report a send blocked. This function sets <fsb> to now_ms if it was not
 * already set or if something was sent (to renew <fsb>).
 *
 * if something was sent (<did_send> != 0), a read activity is also reported for
 * non-independent stream.
 */
static forceinline void sc_ep_report_blocked_send(struct stconn *sc, int did_send)
{
	if (did_send || !tick_isset(sc->sedesc->fsb)) {
		sc->sedesc->fsb = now_ms;
		if (did_send && !(sc->flags & SC_FL_INDEP_STR))
			sc_ep_report_read_activity(sc);
	}
}

/* Report a send activity by setting <fsb> to TICK_ETERNITY.
 * For non-independent stream, a read activity is reported.
 */
static forceinline void sc_ep_report_send_activity(struct stconn *sc)
{
	sc->sedesc->fsb = TICK_ETERNITY;
	if (!(sc->flags & SC_FL_INDEP_STR))
		sc_ep_report_read_activity(sc);
}

static forceinline unsigned int sc_ep_have_ff_data(struct stconn *sc)
{
	return se_have_ff_data(sc->sedesc);
}

static forceinline size_t sc_ep_ff_data(struct stconn *sc)
{
	return se_ff_data(sc->sedesc);
}

/* Returns the stream endpoint from an connector, without any control */
static inline void *__sc_endp(const struct stconn *sc)
{
	return sc->sedesc->se;
}

/* Returns the connection from a sc if the endpoint is a mux stream. Otherwise
 * NULL is returned. __sc_conn() returns the connection without any control
 * while sc_conn() check the endpoint type.
 */
static inline struct connection *__sc_conn(const struct stconn *sc)
{
	return sc->sedesc->conn;
}
static inline struct connection *sc_conn(const struct stconn *sc)
{
	if (sc_ep_test(sc, SE_FL_T_MUX))
		return __sc_conn(sc);
	return NULL;
}

/* Returns the mux ops of the connection from an stconn if the endpoint is a
 * mux stream. Otherwise NULL is returned.
 */
static inline const struct mux_ops *sc_mux_ops(const struct stconn *sc)
{
	const struct connection *conn = sc_conn(sc);

	return (conn ? conn->mux : NULL);
}

/* Returns a pointer to the mux stream from a connector if the endpoint is
 * a mux. Otherwise NULL is returned. __sc_mux_strm() returns the mux without
 * any control while sc_mux_strm() checks the endpoint type.
 */
static inline void *__sc_mux_strm(const struct stconn *sc)
{
	return __sc_endp(sc);
}
static inline struct appctx *sc_mux_strm(const struct stconn *sc)
{
	if (sc_ep_test(sc, SE_FL_T_MUX))
		return __sc_mux_strm(sc);
	return NULL;
}

/* Returns the appctx from a sc if the endpoint is an appctx. Otherwise
 * NULL is returned. __sc_appctx() returns the appctx without any control
 * while sc_appctx() checks the endpoint type.
 */
static inline struct appctx *__sc_appctx(const struct stconn *sc)
{
	return __sc_endp(sc);
}
static inline struct appctx *sc_appctx(const struct stconn *sc)
{
	if (sc_ep_test(sc, SE_FL_T_APPLET))
		return __sc_appctx(sc);
	return NULL;
}

/* Returns the stream from a sc if the application is a stream. Otherwise
 * NULL is returned. __sc_strm() returns the stream without any control
 * while sc_strm() check the application type.
 */
static inline struct stream *__sc_strm(const struct stconn *sc)
{
	return __objt_stream(sc->app);
}

static inline struct stream *sc_strm(const struct stconn *sc)
{
	if (obj_type(sc->app) == OBJ_TYPE_STREAM)
		return __sc_strm(sc);
	return NULL;
}

/* Returns the healthcheck from a sc if the application is a
 * healthcheck. Otherwise NULL is returned. __sc_check() returns the healthcheck
 * without any control while sc_check() check the application type.
 */
static inline struct check *__sc_check(const struct stconn *sc)
{
	return __objt_check(sc->app);
}
static inline struct check *sc_check(const struct stconn *sc)
{
	if (obj_type(sc->app) == OBJ_TYPE_CHECK)
		return __objt_check(sc->app);
	return NULL;
}

/* Returns the name of the application layer's name for the stconn,
 * or "NONE" when none is attached.
 */
static inline const char *sc_get_data_name(const struct stconn *sc)
{
	if (!sc->app_ops)
		return "NONE";
	return sc->app_ops->name;
}

/* shut read */
static inline void sc_conn_shutr(struct stconn *sc, enum co_shr_mode mode)
{
	const struct mux_ops *mux;

	BUG_ON(!sc_conn(sc));

	if (sc_ep_test(sc, SE_FL_SHR))
		return;

	/* clean data-layer shutdown */
	mux = sc_mux_ops(sc);
	if (mux && mux->shutr)
		mux->shutr(sc, mode);
	sc_ep_set(sc, (mode == CO_SHR_DRAIN) ? SE_FL_SHRD : SE_FL_SHRR);
}

/* shut write */
static inline void sc_conn_shutw(struct stconn *sc, enum co_shw_mode mode)
{
	const struct mux_ops *mux;

	BUG_ON(!sc_conn(sc));

	if (sc_ep_test(sc, SE_FL_SHW))
		return;

	/* clean data-layer shutdown */
	mux = sc_mux_ops(sc);
	if (mux && mux->shutw)
		mux->shutw(sc, mode);
	sc_ep_set(sc, (mode == CO_SHW_NORMAL) ? SE_FL_SHWN : SE_FL_SHWS);
}

/* completely close a stream connector (but do not detach it) */
static inline void sc_conn_shut(struct stconn *sc)
{
	sc_conn_shutw(sc, CO_SHW_SILENT);
	sc_conn_shutr(sc, CO_SHR_RESET);
}

/* completely close a stream connector after draining possibly pending data (but do not detach it) */
static inline void sc_conn_drain_and_shut(struct stconn *sc)
{
	sc_conn_shutw(sc, CO_SHW_SILENT);
	sc_conn_shutr(sc, CO_SHR_DRAIN);
}

/* Returns non-zero if the stream connector's Rx path is blocked because of
 * lack of room in the input buffer. This usually happens after applets failed
 * to deliver data into the channel's buffer and reported it via sc_need_room().
 */
__attribute__((warn_unused_result))
static inline int sc_waiting_room(const struct stconn *sc)
{
	return !!(sc->flags & SC_FL_NEED_ROOM);
}

/* The stream endpoint announces it has more data to deliver to the stream's
 * input buffer.
 */
static inline void se_have_more_data(struct sedesc *se)
{
	se_fl_clr(se, SE_FL_HAVE_NO_DATA);
}

/* The stream endpoint announces it doesn't have more data for the stream's
 * input buffer.
 */
static inline void se_have_no_more_data(struct sedesc *se)
{
	se_fl_set(se, SE_FL_HAVE_NO_DATA);
}

/* The application layer informs a stream connector that it's willing to
 * receive data from the endpoint. A read activity is reported.
 */
static inline void sc_will_read(struct stconn *sc)
{
	if (sc->flags & SC_FL_WONT_READ) {
		sc->flags &= ~SC_FL_WONT_READ;
		sc_ep_report_read_activity(sc);
	}
}

/* The application layer informs a stream connector that it will not receive
 * data from the endpoint (e.g. need to flush, bw limitations etc). Usually
 * it corresponds to the channel's CF_DONT_READ flag.
 */
static inline void sc_wont_read(struct stconn *sc)
{
	sc->flags |= SC_FL_WONT_READ;
}

/* An frontend (applet) stream endpoint tells the connector it needs the other
 * side to connect or fail before continuing to work. This is used for example
 * to allow an applet not to deliver data to a request channel before a
 * connection is confirmed.
 */
static inline void se_need_remote_conn(struct sedesc *se)
{
	se_fl_set(se, SE_FL_APPLET_NEED_CONN);
}

/* The application layer tells the stream connector that it just got the input
 * buffer it was waiting for. A read activity is reported.
 */
static inline void sc_have_buff(struct stconn *sc)
{
	if (sc->flags & SC_FL_NEED_BUFF) {
		sc->flags &= ~SC_FL_NEED_BUFF;
		sc_ep_report_read_activity(sc);
	}
}

/* The stream connector failed to get an input buffer and is waiting for it.
 * It indicates a willingness to deliver data to the buffer that will have to
 * be retried. As such, callers will often automatically clear SE_FL_HAVE_NO_DATA
 * to be called again as soon as SC_FL_NEED_BUFF is cleared.
 */
static inline void sc_need_buff(struct stconn *sc)
{
	sc->flags |= SC_FL_NEED_BUFF;
}

/* Tell a stream connector some room was made in the input buffer and any
 * failed attempt to inject data into it may be tried again. This is usually
 * called after a successful transfer of buffer contents to the other side.
 *  A read activity is reported.
 */
static inline void sc_have_room(struct stconn *sc)
{
	if (sc->flags & SC_FL_NEED_ROOM) {
		sc->flags &= ~SC_FL_NEED_ROOM;
		sc->room_needed = 0;
		sc_ep_report_read_activity(sc);
	}
}

/* The stream connector announces it failed to put data into the input buffer
 * by lack of room. Since it indicates a willingness to deliver data to the
 * buffer that will have to be retried. Usually the caller will also clear
 * SE_FL_HAVE_NO_DATA to be called again as soon as SC_FL_NEED_ROOM is cleared.
 *
 * The caller is responsible to specified the amount of free space required to
 * progress. It must take care to not exceed the buffer size.
 */
static inline void sc_need_room(struct stconn *sc, ssize_t room_needed)
{
	sc->flags |= SC_FL_NEED_ROOM;
	BUG_ON_HOT(room_needed > (ssize_t)global.tune.bufsize);
	sc->room_needed = room_needed;
}

/* The stream endpoint indicates that it's ready to consume data from the
 * stream's output buffer. Report a send activity if the SE is unblocked.
 */
static inline void se_will_consume(struct sedesc *se)
{
	if (se_fl_test(se, SE_FL_WONT_CONSUME)) {
		se_fl_clr(se, SE_FL_WONT_CONSUME);
		sc_ep_report_send_activity(se->sc);
	}
}

/* The stream endpoint indicates that it's not willing to consume data from the
 * stream's output buffer.
 */
static inline void se_wont_consume(struct sedesc *se)
{
	se_fl_set(se, SE_FL_WONT_CONSUME);
}

/* The stream endpoint indicates that it's willing to consume data from the
 * stream's output buffer, but that there's not enough, so it doesn't want to
 * be woken up until more are presented.
 */
static inline void se_need_more_data(struct sedesc *se)
{
	se_will_consume(se);
	se_fl_set(se, SE_FL_WAIT_DATA);
}


static inline size_t se_nego_ff(struct sedesc *se, struct buffer *input, size_t count, unsigned int may_splice)
{
	size_t ret = 0;

	if (se_fl_test(se, SE_FL_T_MUX)) {
		const struct mux_ops *mux = se->conn->mux;

		se->iobuf.flags &= ~IOBUF_FL_FF_BLOCKED;
		if (mux->nego_fastfwd && mux->done_fastfwd) {
			/* Disable zero-copy forwarding if EOS or an error was reported. */
			if (se_fl_test(se, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING)) {
				se->iobuf.flags |= IOBUF_FL_NO_FF;
				goto end;
			}

			ret = mux->nego_fastfwd(se->sc, input, count, may_splice);
			if (se->iobuf.flags & IOBUF_FL_FF_BLOCKED) {
				sc_ep_report_blocked_send(se->sc, 0);

				if (!(se->sc->wait_event.events & SUB_RETRY_SEND)) {
					/* The SC must be subs for send to be notify when some
					 * space is made
					 */
					mux->subscribe(se->sc, SUB_RETRY_SEND, &se->sc->wait_event);
				}
			}
			goto end;
		}
	}
	se->iobuf.flags |= IOBUF_FL_NO_FF;

  end:
	return ret;
}

static inline void se_done_ff(struct sedesc *se)
{
	if (se_fl_test(se, SE_FL_T_MUX)) {
		const struct mux_ops *mux = se->conn->mux;
		size_t sent, to_send = se_ff_data(se);

		BUG_ON(!mux->done_fastfwd);
		sent = mux->done_fastfwd(se->sc);
		if (to_send) {
			if (sent == to_send)
				sc_ep_report_send_activity(se->sc);
			else
				sc_ep_report_blocked_send(se->sc, sent != 0);
		}
	}
}

#endif /* _HAPROXY_STCONN_H */