summaryrefslogtreecommitdiffstats
path: root/vendor/react/http/src/Middleware/LimitConcurrentRequestsMiddleware.php
blob: 5333810085df2bd6453fd3807b1f329d1004bd52 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
<?php

namespace React\Http\Middleware;

use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use React\Http\Io\HttpBodyStream;
use React\Http\Io\PauseBufferStream;
use React\Promise;
use React\Promise\PromiseInterface;
use React\Promise\Deferred;
use React\Stream\ReadableStreamInterface;

/**
 * Limits how many next handlers can be executed concurrently.
 *
 * If this middleware is invoked, it will check if the number of pending
 * handlers is below the allowed limit and then simply invoke the next handler
 * and it will return whatever the next handler returns (or throws).
 *
 * If the number of pending handlers exceeds the allowed limit, the request will
 * be queued (and its streaming body will be paused) and it will return a pending
 * promise.
 * Once a pending handler returns (or throws), it will pick the oldest request
 * from this queue and invokes the next handler (and its streaming body will be
 * resumed).
 *
 * The following example shows how this middleware can be used to ensure no more
 * than 10 handlers will be invoked at once:
 *
 * ```php
 * $http = new React\Http\HttpServer(
 *     new React\Http\Middleware\StreamingRequestMiddleware(),
 *     new React\Http\Middleware\LimitConcurrentRequestsMiddleware(10),
 *     $handler
 * );
 * ```
 *
 * Similarly, this middleware is often used in combination with the
 * [`RequestBodyBufferMiddleware`](#requestbodybuffermiddleware) (see below)
 * to limit the total number of requests that can be buffered at once:
 *
 * ```php
 * $http = new React\Http\HttpServer(
 *     new React\Http\Middleware\StreamingRequestMiddleware(),
 *     new React\Http\Middleware\LimitConcurrentRequestsMiddleware(100), // 100 concurrent buffering handlers
 *     new React\Http\Middleware\RequestBodyBufferMiddleware(2 * 1024 * 1024), // 2 MiB per request
 *     new React\Http\Middleware\RequestBodyParserMiddleware(),
 *     $handler
 * );
 * ```
 *
 * More sophisticated examples include limiting the total number of requests
 * that can be buffered at once and then ensure the actual request handler only
 * processes one request after another without any concurrency:
 *
 * ```php
 * $http = new React\Http\HttpServer(
 *     new React\Http\Middleware\StreamingRequestMiddleware(),
 *     new React\Http\Middleware\LimitConcurrentRequestsMiddleware(100), // 100 concurrent buffering handlers
 *     new React\Http\Middleware\RequestBodyBufferMiddleware(2 * 1024 * 1024), // 2 MiB per request
 *     new React\Http\Middleware\RequestBodyParserMiddleware(),
 *     new React\Http\Middleware\LimitConcurrentRequestsMiddleware(1), // only execute 1 handler (no concurrency)
 *     $handler
 * );
 * ```
 *
 * @see RequestBodyBufferMiddleware
 */
final class LimitConcurrentRequestsMiddleware
{
    private $limit;
    private $pending = 0;
    private $queue = array();

    /**
     * @param int $limit Maximum amount of concurrent requests handled.
     *
     * For example when $limit is set to 10, 10 requests will flow to $next
     * while more incoming requests have to wait until one is done.
     */
    public function __construct($limit)
    {
        $this->limit = $limit;
    }

    public function __invoke(ServerRequestInterface $request, $next)
    {
        // happy path: simply invoke next request handler if we're below limit
        if ($this->pending < $this->limit) {
            ++$this->pending;

            try {
                $response = $next($request);
            } catch (\Exception $e) {
                $this->processQueue();
                throw $e;
            } catch (\Throwable $e) { // @codeCoverageIgnoreStart
                // handle Errors just like Exceptions (PHP 7+ only)
                $this->processQueue();
                throw $e; // @codeCoverageIgnoreEnd
            }

            // happy path: if next request handler returned immediately,
            // we can simply try to invoke the next queued request
            if ($response instanceof ResponseInterface) {
                $this->processQueue();
                return $response;
            }

            // if the next handler returns a pending promise, we have to
            // await its resolution before invoking next queued request
            return $this->await(Promise\resolve($response));
        }

        // if we reach this point, then this request will need to be queued
        // check if the body is streaming, in which case we need to buffer everything
        $body = $request->getBody();
        if ($body instanceof ReadableStreamInterface) {
            // pause actual body to stop emitting data until the handler is called
            $size = $body->getSize();
            $body = new PauseBufferStream($body);
            $body->pauseImplicit();

            // replace with buffering body to ensure any readable events will be buffered
            $request = $request->withBody(new HttpBodyStream(
                $body,
                $size
            ));
        }

        // get next queue position
        $queue =& $this->queue;
        $queue[] = null;
        \end($queue);
        $id = \key($queue);

        $deferred = new Deferred(function ($_, $reject) use (&$queue, $id) {
            // queued promise cancelled before its next handler is invoked
            // remove from queue and reject explicitly
            unset($queue[$id]);
            $reject(new \RuntimeException('Cancelled queued next handler'));
        });

        // queue request and process queue if pending does not exceed limit
        $queue[$id] = $deferred;

        $pending = &$this->pending;
        $that = $this;
        return $deferred->promise()->then(function () use ($request, $next, $body, &$pending, $that) {
            // invoke next request handler
            ++$pending;

            try {
                $response = $next($request);
            } catch (\Exception $e) {
                $that->processQueue();
                throw $e;
            } catch (\Throwable $e) { // @codeCoverageIgnoreStart
                // handle Errors just like Exceptions (PHP 7+ only)
                $that->processQueue();
                throw $e; // @codeCoverageIgnoreEnd
            }

            // resume readable stream and replay buffered events
            if ($body instanceof PauseBufferStream) {
                $body->resumeImplicit();
            }

            // if the next handler returns a pending promise, we have to
            // await its resolution before invoking next queued request
            return $that->await(Promise\resolve($response));
        });
    }

    /**
     * @internal
     * @param PromiseInterface $promise
     * @return PromiseInterface
     */
    public function await(PromiseInterface $promise)
    {
        $that = $this;

        return $promise->then(function ($response) use ($that) {
            $that->processQueue();

            return $response;
        }, function ($error) use ($that) {
            $that->processQueue();

            return Promise\reject($error);
        });
    }

    /**
     * @internal
     */
    public function processQueue()
    {
        // skip if we're still above concurrency limit or there's no queued request waiting
        if (--$this->pending >= $this->limit || !$this->queue) {
            return;
        }

        $first = \reset($this->queue);
        unset($this->queue[key($this->queue)]);

        $first->resolve();
    }
}