summaryrefslogtreecommitdiffstats
path: root/modules/http2/h2_mplx.h
blob: 1f79aa8248a473538c8d5a18e8ca2f8c3c3ade84 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
/* Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef __mod_h2__h2_mplx__
#define __mod_h2__h2_mplx__

/**
 * The stream multiplexer. It performs communication between the
 * primary HTTP/2 connection (c1) to the secondary connections (c2)
 * that process the requests, aka. HTTP/2 streams.
 *
 * There is one h2_mplx instance for each h2_session.
 *
 * Naming Convention:
 * "h2_mplx_c1_" are methods only to be called by the primary connection
 * "h2_mplx_c2_" are methods only to be called by a secondary connection
 * "h2_mplx_worker_" are methods only to be called by a h2 worker thread
 */

struct apr_pool_t;
struct apr_thread_mutex_t;
struct apr_thread_cond_t;
struct h2_bucket_beam;
struct h2_config;
struct h2_ihash_t;
struct h2_stream;
struct h2_request;
struct apr_thread_cond_t;
struct h2_workers;
struct h2_iqueue;

#include <apr_queue.h>

#include "h2_workers.h"

typedef struct h2_c2_transit h2_c2_transit;

struct h2_c2_transit {
    apr_pool_t *pool;
    apr_bucket_alloc_t *bucket_alloc;
};

typedef struct h2_mplx h2_mplx;

struct h2_mplx {
    int child_num;                  /* child this runs in */
    apr_uint32_t id;                /* id unique per child */
    conn_rec *c1;                   /* the main connection */
    apr_pool_t *pool;
    struct h2_stream *stream0;      /* HTTP/2's stream 0 */
    server_rec *s;                  /* server for master conn */

    int shutdown;                   /* we are shutting down */
    int aborted;                    /* we need to get out of here asap */
    int polling;                    /* is waiting/processing pollset events */
    ap_conn_producer_t *producer;   /* registered producer at h2_workers */

    struct h2_ihash_t *streams;     /* all streams active */
    struct h2_ihash_t *shold;       /* all streams done with c2 processing ongoing */
    apr_array_header_t *spurge;     /* all streams done, ready for destroy */
    
    struct h2_iqueue *q;            /* all stream ids that need to be started */

    apr_size_t stream_max_mem;      /* max memory to buffer for a stream */
    apr_uint32_t max_streams;       /* max # of concurrent streams */
    apr_uint32_t max_stream_id_started; /* highest stream id that started processing */

    apr_uint32_t processing_count;  /* # of c2 working for this mplx */
    apr_uint32_t processing_limit;  /* current limit on processing c2s, dynamic */
    apr_uint32_t processing_max;    /* max, hard limit of processing c2s */
    
    apr_time_t last_mood_change;    /* last time, processing limit changed */
    apr_interval_time_t mood_update_interval; /* how frequent we update at most */
    apr_uint32_t irritations_since; /* irritations (>0) or happy events (<0) since last mood change */

    apr_thread_mutex_t *lock;
    struct apr_thread_cond_t *join_wait;
    
    apr_pollset_t *pollset;         /* pollset for c1/c2 IO events */
    apr_array_header_t *streams_ev_in;
    apr_array_header_t *streams_ev_out;

    apr_thread_mutex_t *poll_lock; /* protect modifications of queues below */
    struct h2_iqueue *streams_input_read;  /* streams whose input has been read from */
    struct h2_iqueue *streams_output_written; /* streams whose output has been written to */

    struct h2_workers *workers;     /* h2 workers process wide instance */

    request_rec *scratch_r;         /* pseudo request_rec for scoreboard reporting */

    apr_uint32_t max_spare_transits; /* max number of transit pools idling */
    apr_array_header_t *c2_transits; /* base pools for running c2 connections */
};

apr_status_t h2_mplx_c1_child_init(apr_pool_t *pool, server_rec *s);

/**
 * Create the multiplexer for the given HTTP2 session. 
 * Implicitly has reference count 1.
 */
h2_mplx *h2_mplx_c1_create(int child_id, apr_uint32_t id,
                           struct h2_stream *stream0,
                           server_rec *s, apr_pool_t *master,
                           struct h2_workers *workers);

/**
 * Destroy the mplx, shutting down all ongoing processing.
 * @param m the mplx destroyed
 * @param wait condition var to wait on for ref counter == 0
 */ 
void h2_mplx_c1_destroy(h2_mplx *m);

/**
 * Shut down the multiplexer gracefully. Will no longer schedule new streams
 * but let the ongoing ones finish normally.
 * @return the highest stream id being/been processed
 */
int h2_mplx_c1_shutdown(h2_mplx *m);

/**
 * Notifies mplx that a stream has been completely handled on the main
 * connection and is ready for cleanup.
 * 
 * @param m the mplx itself
 * @param stream the stream ready for cleanup
 * @param pstream_count return the number of streams active
 */
apr_status_t h2_mplx_c1_stream_cleanup(h2_mplx *m, struct h2_stream *stream,
                                       unsigned int *pstream_count);

int h2_mplx_c1_stream_is_running(h2_mplx *m, struct h2_stream *stream);

/**
 * Process a stream request.
 * 
 * @param m the multiplexer
 * @param read_to_process
 * @param input_pending
 * @param cmp the stream priority compare function
 * @param pstream_count on return the number of streams active in mplx
 */
void h2_mplx_c1_process(h2_mplx *m,
                        struct h2_iqueue *read_to_process,
                        h2_stream_get_fn *get_stream,
                        h2_stream_pri_cmp_fn *cmp,
                        struct h2_session *session,
                        unsigned int *pstream_count);

/**
 * Stream priorities have changed, reschedule pending requests.
 * 
 * @param m the multiplexer
 * @param cmp the stream priority compare function
 * @param ctx context data for the compare function
 */
apr_status_t h2_mplx_c1_reprioritize(h2_mplx *m, h2_stream_pri_cmp_fn *cmp,
                                    struct h2_session *session);

typedef void stream_ev_callback(void *ctx, struct h2_stream *stream);

/**
 * Poll the primary connection for input and the active streams for output.
 * Invoke the callback for any stream where an event happened.
 */
apr_status_t h2_mplx_c1_poll(h2_mplx *m, apr_interval_time_t timeout,
                            stream_ev_callback *on_stream_input,
                            stream_ev_callback *on_stream_output,
                            void *on_ctx);

void h2_mplx_c2_input_read(h2_mplx *m, conn_rec *c2);
void h2_mplx_c2_output_written(h2_mplx *m, conn_rec *c2);

typedef int h2_mplx_stream_cb(struct h2_stream *s, void *userdata);

/**
 * Iterate over all streams known to mplx from the primary connection.
 * @param m the mplx
 * @param cb the callback to invoke on each stream
 * @param ctx userdata passed to the callback
 */
apr_status_t h2_mplx_c1_streams_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx);

/**
 * A stream has been RST_STREAM by the client. Abort
 * any processing going on and remove from processing
 * queue.
 */
apr_status_t h2_mplx_c1_client_rst(h2_mplx *m, int stream_id);

/**
 * Get readonly access to a stream for a secondary connection.
 */
const struct h2_stream *h2_mplx_c2_stream_get(h2_mplx *m, int stream_id);

/**
 * A h2 worker asks for a secondary connection to process.
 * @param out_c2 non-NULL, a pointer where to reveive the next
 *               secondary connection to process.
 */
apr_status_t h2_mplx_worker_pop_c2(h2_mplx *m, conn_rec **out_c2);

#define H2_MPLX_MSG(m, msg)     \
    "h2_mplx(%d-%lu): "msg, m->child_num, (unsigned long)m->id

#endif /* defined(__mod_h2__h2_mplx__) */