summaryrefslogtreecommitdiffstats
path: root/src/mux_pt.c
blob: 3cca6a1bfb81280c4573343450a4c94e3164f4be (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
/*
 * Pass-through mux-demux for connections
 *
 * Copyright 2017 Willy Tarreau <w@1wt.eu>
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version
 * 2 of the License, or (at your option) any later version.
 *
 */

#include <haproxy/api.h>
#include <haproxy/buf.h>
#include <haproxy/cfgparse.h>
#include <haproxy/connection.h>
#include <haproxy/pipe.h>
#include <haproxy/stconn.h>
#include <haproxy/stream.h>
#include <haproxy/task.h>
#include <haproxy/trace.h>
#include <haproxy/xref.h>

struct mux_pt_ctx {
	struct sedesc *sd;
	struct connection *conn;
	struct wait_event wait_event;
};

DECLARE_STATIC_POOL(pool_head_pt_ctx, "mux_pt", sizeof(struct mux_pt_ctx));

/* trace source and events */
static void pt_trace(enum trace_level level, uint64_t mask,
                     const struct trace_source *src,
                     const struct ist where, const struct ist func,
                     const void *a1, const void *a2, const void *a3, const void *a4);

/* The event representation is split like this :
 *   pt_ctx - internal PT context
 *   strm   - application layer
 */
static const struct trace_event pt_trace_events[] = {
#define           PT_EV_CONN_NEW      (1ULL <<  0)
	{ .mask = PT_EV_CONN_NEW,     .name = "pt_conn_new",  .desc = "new PT connection" },
#define           PT_EV_CONN_WAKE     (1ULL <<  1)
	{ .mask = PT_EV_CONN_WAKE,    .name = "pt_conn_wake", .desc = "PT connection woken up" },
#define           PT_EV_CONN_END      (1ULL <<  2)
	{ .mask = PT_EV_CONN_END,     .name = "pt_conn_end",  .desc = "PT connection terminated" },
#define           PT_EV_CONN_ERR      (1ULL <<  3)
	{ .mask = PT_EV_CONN_ERR,     .name = "pt_conn_err",  .desc = "error on PT connection" },
#define           PT_EV_STRM_NEW      (1ULL <<  4)
	{ .mask = PT_EV_STRM_NEW,     .name = "strm_new",      .desc = "app-layer stream creation" },
#define           PT_EV_STRM_SHUT     (1ULL <<  5)
	{ .mask = PT_EV_STRM_SHUT,    .name = "strm_shut",     .desc = "stream shutdown" },
#define           PT_EV_STRM_END      (1ULL <<  6)
	{ .mask = PT_EV_STRM_END,     .name = "strm_end",      .desc = "detaching app-layer stream" },
#define           PT_EV_STRM_ERR      (1ULL <<  7)
	{ .mask = PT_EV_STRM_ERR,     .name = "strm_err",      .desc = "stream error" },
#define           PT_EV_RX_DATA       (1ULL <<  8)
	{ .mask = PT_EV_RX_DATA,      .name = "pt_rx_data", .desc = "Rx on PT connection" },
#define           PT_EV_TX_DATA       (1ULL <<  9)
	{ .mask = PT_EV_TX_DATA,      .name = "pt_tx_data", .desc = "Tx on PT connection" },

	{}
};


static const struct name_desc pt_trace_decoding[] = {
#define PT_VERB_CLEAN    1
	{ .name="clean",    .desc="only user-friendly stuff, generally suitable for level \"user\"" },
#define PT_VERB_MINIMAL  2
	{ .name="minimal",  .desc="report only h1c/h1s state and flags, no real decoding" },
#define PT_VERB_SIMPLE   3
	{ .name="simple",   .desc="add request/response status line or htx info when available" },
#define PT_VERB_ADVANCED 4
	{ .name="advanced", .desc="add header fields or frame decoding when available" },
#define PT_VERB_COMPLETE 5
	{ .name="complete", .desc="add full data dump when available" },
	{ /* end */ }
};

static struct trace_source trace_pt __read_mostly = {
	.name = IST("pt"),
	.desc = "Passthrough multiplexer",
	.arg_def = TRC_ARG1_CONN,  // TRACE()'s first argument is always a connection
	.default_cb = pt_trace,
	.known_events = pt_trace_events,
	.lockon_args = NULL,
	.decoding = pt_trace_decoding,
	.report_events = ~0,  // report everything by default
};

#define TRACE_SOURCE &trace_pt
INITCALL1(STG_REGISTER, trace_register_source, TRACE_SOURCE);

/* returns the stconn associated to the stream */
static forceinline struct stconn *pt_sc(const struct mux_pt_ctx *pt)
{
	return pt->sd->sc;
}

static inline void pt_trace_buf(const struct buffer *buf, size_t ofs, size_t len)
{
	size_t block1, block2;
	int line, ptr, newptr;

	block1 = b_contig_data(buf, ofs);
	block2 = 0;
	if (block1 > len)
		block1 = len;
	block2 = len - block1;

	ofs = b_peek_ofs(buf, ofs);

	line = 0;
	ptr = ofs;
	while (ptr < ofs + block1) {
		newptr = dump_text_line(&trace_buf, b_orig(buf), b_size(buf), ofs + block1, &line, ptr);
		if (newptr == ptr)
			break;
		ptr = newptr;
	}

	line = ptr = 0;
	while (ptr < block2) {
		newptr = dump_text_line(&trace_buf, b_orig(buf), b_size(buf), block2, &line, ptr);
		if (newptr == ptr)
			break;
		ptr = newptr;
	}
}

/* the PT traces always expect that arg1, if non-null, is of type connection
 * (from which we can derive the pt context), that arg2, if non-null, is a
 * stream connector, and that arg3, if non-null, is a buffer.
 */
static void pt_trace(enum trace_level level, uint64_t mask, const struct trace_source *src,
                     const struct ist where, const struct ist func,
                     const void *a1, const void *a2, const void *a3, const void *a4)
{
	const struct connection *conn = a1;
	const struct mux_pt_ctx *ctx = conn ? conn->ctx : NULL;
	const struct stconn *sc = a2;
	const struct buffer *buf = a3;
	const size_t *val = a4;

	if (!ctx || src->verbosity < PT_VERB_CLEAN)
		return;

	/* Display frontend/backend info by default */
	chunk_appendf(&trace_buf, " : [%c]", (conn_is_back(conn) ? 'B' : 'F'));

	if (src->verbosity == PT_VERB_CLEAN)
		return;

	if (!sc)
		sc = pt_sc(ctx);

	/* Display the value to the 4th argument (level > STATE) */
	if (src->level > TRACE_LEVEL_STATE && val)
		chunk_appendf(&trace_buf, " - VAL=%lu", (long)*val);

	/* Display conn and sc info, if defined (pointer + flags) */
	chunk_appendf(&trace_buf, " - conn=%p(0x%08x)", conn, conn->flags);
	chunk_appendf(&trace_buf, " sd=%p(0x%08x)", ctx->sd, se_fl_get(ctx->sd));
	if (sc)
		chunk_appendf(&trace_buf, " sc=%p(0x%08x)", sc, sc->flags);

	if (src->verbosity == PT_VERB_MINIMAL)
		return;

	/* Display buffer info, if defined (level > USER & verbosity > SIMPLE) */
	if (src->level > TRACE_LEVEL_USER && buf) {
		int full = 0, max = 3000, chunk = 1024;

		/* Full info (level > STATE && verbosity > SIMPLE) */
		if (src->level > TRACE_LEVEL_STATE) {
			if (src->verbosity == PT_VERB_COMPLETE)
				full = 1;
			else if (src->verbosity == PT_VERB_ADVANCED) {
				full = 1;
				max = 256;
				chunk = 64;
			}
		}

		chunk_appendf(&trace_buf, " buf=%u@%p+%u/%u",
			      (unsigned int)b_data(buf), b_orig(buf),
			      (unsigned int)b_head_ofs(buf), (unsigned int)b_size(buf));

		if (b_data(buf) && full) {
			chunk_memcat(&trace_buf, "\n", 1);
			if (b_data(buf) < max)
				pt_trace_buf(buf, 0, b_data(buf));
			else {
				pt_trace_buf(buf, 0, chunk);
				chunk_memcat(&trace_buf, "  ...\n", 6);
				pt_trace_buf(buf, b_data(buf) - chunk, chunk);
			}
		}
	}
}

static void mux_pt_destroy(struct mux_pt_ctx *ctx)
{
	struct connection *conn = NULL;

	TRACE_POINT(PT_EV_CONN_END);

	/* The connection must be attached to this mux to be released */
	if (ctx->conn && ctx->conn->ctx == ctx)
		conn = ctx->conn;

	tasklet_free(ctx->wait_event.tasklet);

	if (conn && ctx->wait_event.events != 0)
		conn->xprt->unsubscribe(conn, conn->xprt_ctx, ctx->wait_event.events,
					&ctx->wait_event);
	BUG_ON(ctx->sd && !se_fl_test(ctx->sd, SE_FL_ORPHAN));
	sedesc_free(ctx->sd);
	pool_free(pool_head_pt_ctx, ctx);

	if (conn) {
		conn->mux = NULL;
		conn->ctx = NULL;
		TRACE_DEVEL("freeing conn", PT_EV_CONN_END, conn);

		conn_stop_tracking(conn);
		conn_full_close(conn);
		if (conn->destroy_cb)
			conn->destroy_cb(conn);
		conn_free(conn);
	}
}

/* Callback, used when we get I/Os while in idle mode. This one is exported so
 * that "show fd" can resolve it.
 */
struct task *mux_pt_io_cb(struct task *t, void *tctx, unsigned int status)
{
	struct mux_pt_ctx *ctx = tctx;

	TRACE_ENTER(PT_EV_CONN_WAKE, ctx->conn);
	if (!se_fl_test(ctx->sd, SE_FL_ORPHAN)) {
		/* There's a small race condition.
		 * mux_pt_io_cb() is only supposed to be called if we have no
		 * stream attached. However, maybe the tasklet got woken up,
		 * and this connection was then attached to a new stream.
		 * If this happened, just wake the tasklet up if anybody
		 * subscribed to receive events, and otherwise call the wake
		 * method, to make sure the event is noticed.
		 */
		if (ctx->conn->subs) {
			ctx->conn->subs->events = 0;
			tasklet_wakeup(ctx->conn->subs->tasklet);
			ctx->conn->subs = NULL;
		} else if (pt_sc(ctx)->app_ops->wake)
			pt_sc(ctx)->app_ops->wake(pt_sc(ctx));
		TRACE_DEVEL("leaving waking up SC", PT_EV_CONN_WAKE, ctx->conn);
		return t;
	}
	conn_ctrl_drain(ctx->conn);
	if (ctx->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH)) {
		TRACE_DEVEL("leaving destroying pt context", PT_EV_CONN_WAKE, ctx->conn);
		mux_pt_destroy(ctx);
		t = NULL;
	}
	else {
		ctx->conn->xprt->subscribe(ctx->conn, ctx->conn->xprt_ctx, SUB_RETRY_RECV,
					   &ctx->wait_event);
		TRACE_DEVEL("leaving subscribing for reads", PT_EV_CONN_WAKE, ctx->conn);
	}

	return t;
}

/* Initialize the mux once it's attached. It is expected that conn->ctx points
 * to the existing stream connector (for outgoing connections) or NULL (for
 * incoming ones, in which case one will be allocated and a new stream will be
 * instantiated). Returns < 0 on error.
 */
static int mux_pt_init(struct connection *conn, struct proxy *prx, struct session *sess,
		       struct buffer *input)
{
	struct stconn *sc = conn->ctx;
	struct mux_pt_ctx *ctx = pool_alloc(pool_head_pt_ctx);

	TRACE_ENTER(PT_EV_CONN_NEW);

	if (!ctx) {
		TRACE_ERROR("PT context allocation failure", PT_EV_CONN_NEW|PT_EV_CONN_END|PT_EV_CONN_ERR);
		goto fail;
	}

	ctx->wait_event.tasklet = tasklet_new();
	if (!ctx->wait_event.tasklet)
		goto fail_free_ctx;
	ctx->wait_event.tasklet->context = ctx;
	ctx->wait_event.tasklet->process = mux_pt_io_cb;
	ctx->wait_event.events = 0;
	ctx->conn = conn;

	if (!sc) {
		ctx->sd = sedesc_new();
		if (!ctx->sd) {
			TRACE_ERROR("SC allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
			goto fail_free_ctx;
		}
		ctx->sd->se     = ctx;
		ctx->sd->conn   = conn;
		se_fl_set(ctx->sd, SE_FL_T_MUX | SE_FL_ORPHAN);

		sc = sc_new_from_endp(ctx->sd, sess, input);
		if (!sc) {
			TRACE_ERROR("SC allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
			goto fail_free_sd;
		}
		TRACE_POINT(PT_EV_STRM_NEW, conn, sc);
	}
	else {
		if (sc_attach_mux(sc, ctx, conn) < 0)
			goto fail_free_ctx;
		ctx->sd = sc->sedesc;
	}
	conn->ctx = ctx;
	se_fl_set(ctx->sd, SE_FL_RCV_MORE);
	if ((global.tune.options & GTUNE_USE_SPLICE) && !(global.tune.no_zero_copy_fwd & NO_ZERO_COPY_FWD_PT))
		se_fl_set(ctx->sd, SE_FL_MAY_FASTFWD_PROD|SE_FL_MAY_FASTFWD_CONS);

	TRACE_LEAVE(PT_EV_CONN_NEW, conn);
	return 0;

 fail_free_sd:
	sedesc_free(ctx->sd);
 fail_free_ctx:
	tasklet_free(ctx->wait_event.tasklet);
	pool_free(pool_head_pt_ctx, ctx);
 fail:
	TRACE_DEVEL("leaving in error", PT_EV_CONN_NEW|PT_EV_CONN_END|PT_EV_CONN_ERR);
	return -1;
}

/* callback to be used by default for the pass-through mux. It calls the data
 * layer wake() callback if it is set otherwise returns 0.
 */
static int mux_pt_wake(struct connection *conn)
{
	struct mux_pt_ctx *ctx = conn->ctx;
	int ret = 0;

	TRACE_ENTER(PT_EV_CONN_WAKE, ctx->conn);
	if (!se_fl_test(ctx->sd, SE_FL_ORPHAN)) {
		ret = pt_sc(ctx)->app_ops->wake ? pt_sc(ctx)->app_ops->wake(pt_sc(ctx)) : 0;

		if (ret < 0) {
			TRACE_DEVEL("leaving waking up SC", PT_EV_CONN_WAKE, ctx->conn);
			return ret;
		}
	} else {
		conn_ctrl_drain(conn);
		if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) {
			TRACE_DEVEL("leaving destroying PT context", PT_EV_CONN_WAKE, ctx->conn);
			mux_pt_destroy(ctx);
			return -1;
		}
	}

	/* If we had early data, and we're done with the handshake
	 * then we know the data are safe, and we can remove the flag.
	 */
	if ((conn->flags & (CO_FL_EARLY_DATA | CO_FL_EARLY_SSL_HS | CO_FL_WAIT_XPRT)) ==
	    CO_FL_EARLY_DATA)
		conn->flags &= ~CO_FL_EARLY_DATA;

	TRACE_LEAVE(PT_EV_CONN_WAKE, ctx->conn);
	return ret;
}

/*
 * Attach a new stream to a connection
 * (Used for outgoing connections)
 */
static int mux_pt_attach(struct connection *conn, struct sedesc *sd, struct session *sess)
{
	struct mux_pt_ctx *ctx = conn->ctx;

	TRACE_ENTER(PT_EV_STRM_NEW, conn);
	if (ctx->wait_event.events)
		conn->xprt->unsubscribe(ctx->conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event);
	if (sc_attach_mux(sd->sc, ctx, conn) < 0)
		return -1;
	ctx->sd = sd;
	se_fl_set(ctx->sd, SE_FL_RCV_MORE);
	if ((global.tune.options & GTUNE_USE_SPLICE) && !(global.tune.no_zero_copy_fwd & NO_ZERO_COPY_FWD_PT))
		se_fl_set(ctx->sd, SE_FL_MAY_FASTFWD_PROD|SE_FL_MAY_FASTFWD_CONS);

	TRACE_LEAVE(PT_EV_STRM_NEW, conn, sd->sc);
	return 0;
}

/* Retrieves a valid stream connector from this connection, or returns NULL.
 * For this mux, it's easy as we can only store a single stream connector.
 */
static struct stconn *mux_pt_get_first_sc(const struct connection *conn)
{
	struct mux_pt_ctx *ctx = conn->ctx;

	return pt_sc(ctx);
}

/* Destroy the mux and the associated connection if still attached to this mux
 * and no longer used */
static void mux_pt_destroy_meth(void *ctx)
{
	struct mux_pt_ctx *pt = ctx;

	TRACE_POINT(PT_EV_CONN_END, pt->conn, pt_sc(pt));
	if (se_fl_test(pt->sd, SE_FL_ORPHAN) || pt->conn->ctx != pt) {
		if (pt->conn->ctx != pt) {
			pt->sd = NULL;
		}
		mux_pt_destroy(pt);
	}
}

/*
 * Detach the stream from the connection and possibly release the connection.
 */
static void mux_pt_detach(struct sedesc *sd)
{
	struct connection *conn = sd->conn;
	struct mux_pt_ctx *ctx;

	TRACE_ENTER(PT_EV_STRM_END, conn, sd->sc);

	ctx = conn->ctx;

	/* Subscribe, to know if we got disconnected */
	if (!conn_is_back(conn) && conn->owner != NULL &&
	    !(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))) {
		conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event);
	} else {
		/* There's no session attached to that connection, destroy it */
		TRACE_DEVEL("killing dead connection", PT_EV_STRM_END, conn, sd->sc);
		mux_pt_destroy(ctx);
	}

	TRACE_LEAVE(PT_EV_STRM_END);
}

/* returns the number of streams in use on a connection */
static int mux_pt_used_streams(struct connection *conn)
{
	struct mux_pt_ctx *ctx = conn->ctx;

	return (!se_fl_test(ctx->sd, SE_FL_ORPHAN) ? 1 : 0);
}

/* returns the number of streams still available on a connection */
static int mux_pt_avail_streams(struct connection *conn)
{
	return 1 - mux_pt_used_streams(conn);
}

static void mux_pt_shutr(struct stconn *sc, enum co_shr_mode mode)
{
	struct connection *conn = __sc_conn(sc);
	struct mux_pt_ctx *ctx = conn->ctx;

	TRACE_ENTER(PT_EV_STRM_SHUT, conn, sc);

	se_fl_clr(ctx->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
	if (conn_xprt_ready(conn) && conn->xprt->shutr)
		conn->xprt->shutr(conn, conn->xprt_ctx,
		    (mode == CO_SHR_DRAIN));
	else if (mode == CO_SHR_DRAIN)
		conn_ctrl_drain(conn);
	if (se_fl_test(ctx->sd, SE_FL_SHW))
		conn_full_close(conn);

	TRACE_LEAVE(PT_EV_STRM_SHUT, conn, sc);
}

static void mux_pt_shutw(struct stconn *sc, enum co_shw_mode mode)
{
	struct connection *conn = __sc_conn(sc);
	struct mux_pt_ctx *ctx = conn->ctx;

	TRACE_ENTER(PT_EV_STRM_SHUT, conn, sc);

	if (conn_xprt_ready(conn) && conn->xprt->shutw)
		conn->xprt->shutw(conn, conn->xprt_ctx,
		    (mode == CO_SHW_NORMAL));
	if (!se_fl_test(ctx->sd, SE_FL_SHR))
		conn_sock_shutw(conn, (mode == CO_SHW_NORMAL));
	else
		conn_full_close(conn);

	TRACE_LEAVE(PT_EV_STRM_SHUT, conn, sc);
}

/*
 * Called from the upper layer, to get more data
 *
 * The caller is responsible for defragmenting <buf> if necessary. But <flags>
 * must be tested to know the calling context. If CO_RFL_BUF_FLUSH is set, it
 * means the caller wants to flush input data (from the mux buffer and the
 * channel buffer) to be able to use kernel splicing or any kind of mux-to-mux
 * xfer. If CO_RFL_KEEP_RECV is set, the mux must always subscribe for read
 * events before giving back. CO_RFL_BUF_WET is set if <buf> is congested with
 * data scheduled for leaving soon. CO_RFL_BUF_NOT_STUCK is set to instruct the
 * mux it may optimize the data copy to <buf> if necessary. Otherwise, it should
 * copy as much data as possible.
 */
static size_t mux_pt_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count, int flags)
{
	struct connection *conn = __sc_conn(sc);
	struct mux_pt_ctx *ctx = conn->ctx;
	size_t ret = 0;

	TRACE_ENTER(PT_EV_RX_DATA, conn, sc, buf, (size_t[]){count});

	if (!count) {
		se_fl_set(ctx->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
		goto end;
	}
	b_realign_if_empty(buf);
	ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, buf, count, flags);
	if (conn->flags & CO_FL_ERROR) {
		se_fl_clr(ctx->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
		if (conn_xprt_read0_pending(conn))
			se_fl_set(ctx->sd, SE_FL_EOS);
		se_fl_set(ctx->sd, SE_FL_ERROR);
		TRACE_DEVEL("error on connection", PT_EV_RX_DATA|PT_EV_CONN_ERR, conn, sc);
	}
	else if (conn_xprt_read0_pending(conn)) {
		se_fl_clr(ctx->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
		se_fl_set(ctx->sd, (SE_FL_EOI|SE_FL_EOS));
		TRACE_DEVEL("read0 on connection", PT_EV_RX_DATA, conn, sc);
	}
  end:
	TRACE_LEAVE(PT_EV_RX_DATA, conn, sc, buf, (size_t[]){ret});
	return ret;
}

/* Called from the upper layer, to send data */
static size_t mux_pt_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, int flags)
{
	struct connection *conn = __sc_conn(sc);
	struct mux_pt_ctx *ctx = conn->ctx;
	size_t ret;

	TRACE_ENTER(PT_EV_TX_DATA, conn, sc, buf, (size_t[]){count});

	ret = conn->xprt->snd_buf(conn, conn->xprt_ctx, buf, count, flags);

	if (ret > 0)
		b_del(buf, ret);

	if (conn->flags & CO_FL_ERROR) {
		if (conn_xprt_read0_pending(conn))
			se_fl_set(ctx->sd, SE_FL_EOS);
		se_fl_set_error(ctx->sd);
		TRACE_DEVEL("error on connection", PT_EV_TX_DATA|PT_EV_CONN_ERR, conn, sc);
	}

	TRACE_LEAVE(PT_EV_TX_DATA, conn, sc, buf, (size_t[]){ret});
	return ret;
}

static inline struct sedesc *mux_pt_opposite_sd(struct mux_pt_ctx *ctx)
{
	struct xref *peer;
	struct sedesc *sdo;

	peer = xref_get_peer_and_lock(&ctx->sd->xref);
	if (!peer)
		return NULL;

	sdo = container_of(peer, struct sedesc, xref);
	xref_unlock(&ctx->sd->xref, peer);
	return sdo;
}

static size_t mux_pt_nego_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice)
{
	struct connection *conn = __sc_conn(sc);
	struct mux_pt_ctx *ctx = conn->ctx;
	size_t ret = 0;

	TRACE_ENTER(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){count});

	/* Use kernel splicing if it is supported by the sender and if there
	 * are no input data _AND_ no output data.
	 *
	 * TODO: It may be good to add a flag to send obuf data first if any,
	 *       and then data in pipe, or the opposite. For now, it is not
	 *       supported to mix data.
	 */
	if (!b_data(input) && may_splice) {
		if (conn->xprt->snd_pipe && (ctx->sd->iobuf.pipe || (pipes_used < global.maxpipes && (ctx->sd->iobuf.pipe = get_pipe())))) {
			ctx->sd->iobuf.offset = 0;
			ctx->sd->iobuf.data = 0;
			ret = count;
			goto out;
		}
		ctx->sd->iobuf.flags |= IOBUF_FL_NO_SPLICING;
		TRACE_DEVEL("Unable to allocate pipe for splicing, fallback to buffer", PT_EV_TX_DATA, conn, sc);
	}

	/* No buffer case */

  out:
	TRACE_LEAVE(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){ret});
	return ret;
}

static size_t mux_pt_done_ff(struct stconn *sc)
{
	struct connection *conn = __sc_conn(sc);
	struct mux_pt_ctx *ctx = conn->ctx;
	struct sedesc *sd = ctx->sd;
	size_t total = 0;

	TRACE_ENTER(PT_EV_TX_DATA, conn, sc);

	if (sd->iobuf.pipe) {
		total = conn->xprt->snd_pipe(conn, conn->xprt_ctx, sd->iobuf.pipe, sd->iobuf.pipe->data);
		if (!sd->iobuf.pipe->data) {
			put_pipe(sd->iobuf.pipe);
			sd->iobuf.pipe = NULL;
		}
	}
	else {
		BUG_ON(sd->iobuf.buf);
	}

  out:
	if (conn->flags & CO_FL_ERROR) {
		if (conn_xprt_read0_pending(conn))
			se_fl_set(ctx->sd, SE_FL_EOS);
		se_fl_set_error(ctx->sd);
		TRACE_DEVEL("error on connection", PT_EV_TX_DATA|PT_EV_CONN_ERR, conn, sc);
	}

	TRACE_LEAVE(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){total});
	return total;
}

static int mux_pt_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)
{
	struct connection *conn = __sc_conn(sc);
	struct mux_pt_ctx *ctx = conn->ctx;
	struct sedesc *sdo = NULL;
	size_t total = 0, try = 0;
        int ret = 0;

	TRACE_ENTER(PT_EV_RX_DATA, conn, sc, 0, (size_t[]){count});

	se_fl_clr(ctx->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
	conn->flags &= ~CO_FL_WAIT_ROOM;
	sdo = mux_pt_opposite_sd(ctx);
	if (!sdo) {
		TRACE_STATE("Opposite endpoint not available yet", PT_EV_RX_DATA, conn, sc);
		goto out;
	}

	try = se_nego_ff(sdo, &BUF_NULL, count, conn->xprt->rcv_pipe && !!(flags & CO_RFL_MAY_SPLICE) && !(sdo->iobuf.flags & IOBUF_FL_NO_SPLICING));
	if (sdo->iobuf.flags & IOBUF_FL_NO_FF) {
		/* Fast forwarding is not supported by the consumer */
		se_fl_clr(ctx->sd, SE_FL_MAY_FASTFWD_PROD);
		TRACE_DEVEL("Fast-forwarding not supported by opposite endpoint, disable it", PT_EV_RX_DATA, conn, sc);
		goto end;
	}
	if (sdo->iobuf.flags & IOBUF_FL_FF_BLOCKED) {
		se_fl_set(ctx->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
		TRACE_STATE("waiting for more room", PT_EV_RX_DATA|PT_EV_STRM_ERR, conn, sc);
		goto out;
	}

	total += sdo->iobuf.data;

	if (sdo->iobuf.pipe) {
		/* Here, not data was xferred */
		ret = conn->xprt->rcv_pipe(conn, conn->xprt_ctx, sdo->iobuf.pipe, try);
		if (ret < 0) {
			TRACE_ERROR("Error when trying to fast-forward data, disable it and abort",
				    PT_EV_RX_DATA|PT_EV_STRM_ERR|PT_EV_CONN_ERR, conn, sc);
			se_fl_clr(ctx->sd, SE_FL_MAY_FASTFWD_PROD);
			BUG_ON(sdo->iobuf.pipe->data);
			put_pipe(sdo->iobuf.pipe);
			sdo->iobuf.pipe = NULL;
			goto end;
		}
		total += ret;
	}
	else {
		BUG_ON(sdo->iobuf.buf);
		ret = -1; /* abort splicing for now and fallback to buffer mode */
		goto end;
	}

	ret = total;
	se_done_ff(sdo);

	if (sdo->iobuf.pipe) {
		se_fl_set(ctx->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
	}

	TRACE_DEVEL("Data fast-forwarded", PT_EV_RX_DATA, conn, sc, 0, (size_t[]){ret});


 out:
	if (conn->flags & CO_FL_ERROR) {
		if (conn_xprt_read0_pending(conn))
			se_fl_set(ctx->sd, SE_FL_EOS);
		se_fl_set(ctx->sd, SE_FL_ERROR);
		TRACE_DEVEL("error on connection", PT_EV_RX_DATA|PT_EV_CONN_ERR, conn, sc);
	}
	else if (conn_xprt_read0_pending(conn))  {
		se_fl_set(ctx->sd, (SE_FL_EOS|SE_FL_EOI));
		TRACE_DEVEL("read0 on connection", PT_EV_RX_DATA, conn, sc);
	}
  end:
	TRACE_LEAVE(PT_EV_RX_DATA, conn, sc, 0, (size_t[]){ret});
	return ret;
}

static int mux_pt_resume_fastfwd(struct stconn *sc, unsigned int flags)
{
	struct connection *conn = __sc_conn(sc);
	struct mux_pt_ctx *ctx = conn->ctx;
	struct sedesc *sd = ctx->sd;
	size_t total = 0;

	TRACE_ENTER(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){flags});

	if (sd->iobuf.pipe) {
		total = conn->xprt->snd_pipe(conn, conn->xprt_ctx, sd->iobuf.pipe, sd->iobuf.pipe->data);
		if (!sd->iobuf.pipe->data) {
			put_pipe(sd->iobuf.pipe);
			sd->iobuf.pipe = NULL;
		}
	}
	else {
		BUG_ON(sd->iobuf.buf);
	}

  out:
	if (conn->flags & CO_FL_ERROR) {
		if (conn_xprt_read0_pending(conn))
			se_fl_set(ctx->sd, SE_FL_EOS);
		se_fl_set_error(ctx->sd);
		TRACE_DEVEL("error on connection", PT_EV_TX_DATA|PT_EV_CONN_ERR, conn, sc);
	}

	TRACE_LEAVE(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){total});
	return total;
}

/* Called from the upper layer, to subscribe <es> to events <event_type>. The
 * event subscriber <es> is not allowed to change from a previous call as long
 * as at least one event is still subscribed. The <event_type> must only be a
 * combination of SUB_RETRY_RECV and SUB_RETRY_SEND. It always returns 0.
 */
static int mux_pt_subscribe(struct stconn *sc, int event_type, struct wait_event *es)
{
	struct connection *conn = __sc_conn(sc);

	TRACE_POINT(PT_EV_RX_DATA|PT_EV_TX_DATA, conn, sc, 0, (size_t[]){event_type});
	return conn->xprt->subscribe(conn, conn->xprt_ctx, event_type, es);
}

/* Called from the upper layer, to unsubscribe <es> from events <event_type>.
 * The <es> pointer is not allowed to differ from the one passed to the
 * subscribe() call. It always returns zero.
 */
static int mux_pt_unsubscribe(struct stconn *sc, int event_type, struct wait_event *es)
{
	struct connection *conn = __sc_conn(sc);

	TRACE_POINT(PT_EV_RX_DATA|PT_EV_TX_DATA, conn, sc, 0, (size_t[]){event_type});
	return conn->xprt->unsubscribe(conn, conn->xprt_ctx, event_type, es);
}

static int mux_pt_ctl(struct connection *conn, enum mux_ctl_type mux_ctl, void *output)
{
	int ret = 0;
	switch (mux_ctl) {
	case MUX_CTL_STATUS:
		if (!(conn->flags & CO_FL_WAIT_XPRT))
			ret |= MUX_STATUS_READY;
		return ret;
	case MUX_CTL_EXIT_STATUS:
		return MUX_ES_UNKNOWN;
	default:
		return -1;
	}
}

static int mux_pt_sctl(struct stconn *sc, enum mux_sctl_type mux_sctl, void *output)
{
	int ret = 0;

	switch (mux_sctl) {
	case MUX_SCTL_SID:
		if (output)
			*((int64_t *)output) = 0;
		return ret;

	default:
		return -1;
	}
}

/* config parser for global "tune.pt.zero-copy-forwarding" */
static int cfg_parse_pt_zero_copy_fwd(char **args, int section_type, struct proxy *curpx,
				      const struct proxy *defpx, const char *file, int line,
				      char **err)
{
	if (too_many_args(1, args, err, NULL))
		return -1;

	if (strcmp(args[1], "on") == 0)
		global.tune.no_zero_copy_fwd &= ~NO_ZERO_COPY_FWD_PT;
	else if (strcmp(args[1], "off") == 0)
		global.tune.no_zero_copy_fwd |= NO_ZERO_COPY_FWD_PT;
	else {
		memprintf(err, "'%s' expects 'on' or 'off'.", args[0]);
		return -1;
	}
	return 0;
}


/* config keyword parsers */
static struct cfg_kw_list cfg_kws = {ILH, {
	{ CFG_GLOBAL, "tune.pt.zero-copy-forwarding", cfg_parse_pt_zero_copy_fwd },
	{ 0, NULL, NULL }
}};

INITCALL1(STG_REGISTER, cfg_register_keywords, &cfg_kws);


/* The mux operations */
const struct mux_ops mux_tcp_ops = {
	.init = mux_pt_init,
	.wake = mux_pt_wake,
	.rcv_buf = mux_pt_rcv_buf,
	.snd_buf = mux_pt_snd_buf,
	.nego_fastfwd = mux_pt_nego_ff,
	.done_fastfwd = mux_pt_done_ff,
	.fastfwd = mux_pt_fastfwd,
	.resume_fastfwd = mux_pt_resume_fastfwd,
	.subscribe = mux_pt_subscribe,
	.unsubscribe = mux_pt_unsubscribe,
	.attach = mux_pt_attach,
	.get_first_sc = mux_pt_get_first_sc,
	.detach = mux_pt_detach,
	.avail_streams = mux_pt_avail_streams,
	.used_streams = mux_pt_used_streams,
	.destroy = mux_pt_destroy_meth,
	.ctl = mux_pt_ctl,
	.sctl = mux_pt_sctl,
	.shutr = mux_pt_shutr,
	.shutw = mux_pt_shutw,
	.flags = MX_FL_NONE,
	.name = "PASS",
};


const struct mux_ops mux_pt_ops = {
	.init = mux_pt_init,
	.wake = mux_pt_wake,
	.rcv_buf = mux_pt_rcv_buf,
	.snd_buf = mux_pt_snd_buf,
	.nego_fastfwd = mux_pt_nego_ff,
	.done_fastfwd = mux_pt_done_ff,
	.fastfwd = mux_pt_fastfwd,
	.resume_fastfwd = mux_pt_resume_fastfwd,
	.subscribe = mux_pt_subscribe,
	.unsubscribe = mux_pt_unsubscribe,
	.attach = mux_pt_attach,
	.get_first_sc = mux_pt_get_first_sc,
	.detach = mux_pt_detach,
	.avail_streams = mux_pt_avail_streams,
	.used_streams = mux_pt_used_streams,
	.destroy = mux_pt_destroy_meth,
	.ctl = mux_pt_ctl,
	.sctl = mux_pt_sctl,
	.shutr = mux_pt_shutr,
	.shutw = mux_pt_shutw,
	.flags = MX_FL_NONE|MX_FL_NO_UPG,
	.name = "PASS",
};

/* PROT selection : default mux has empty name */
static struct mux_proto_list mux_proto_none =
	{ .token = IST("none"), .mode = PROTO_MODE_TCP, .side = PROTO_SIDE_BOTH, .mux = &mux_pt_ops };
static struct mux_proto_list mux_proto_tcp =
	{ .token = IST(""), .mode = PROTO_MODE_TCP, .side = PROTO_SIDE_BOTH, .mux = &mux_tcp_ops };

INITCALL1(STG_REGISTER, register_mux_proto, &mux_proto_none);
INITCALL1(STG_REGISTER, register_mux_proto, &mux_proto_tcp);