summaryrefslogtreecommitdiffstats
path: root/vendor/react/http/src/Middleware/LimitConcurrentRequestsMiddleware.php
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 12:38:42 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 12:38:42 +0000
commitc3ca98e1b35123f226c7f4c596b5dee78caa4223 (patch)
tree9b6eb109283da55e7d9064baa9fac795a40264cb /vendor/react/http/src/Middleware/LimitConcurrentRequestsMiddleware.php
parentInitial commit. (diff)
downloadicinga-php-thirdparty-c3ca98e1b35123f226c7f4c596b5dee78caa4223.tar.xz
icinga-php-thirdparty-c3ca98e1b35123f226c7f4c596b5dee78caa4223.zip
Adding upstream version 0.11.0.upstream/0.11.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/react/http/src/Middleware/LimitConcurrentRequestsMiddleware.php')
-rw-r--r--vendor/react/http/src/Middleware/LimitConcurrentRequestsMiddleware.php211
1 files changed, 211 insertions, 0 deletions
diff --git a/vendor/react/http/src/Middleware/LimitConcurrentRequestsMiddleware.php b/vendor/react/http/src/Middleware/LimitConcurrentRequestsMiddleware.php
new file mode 100644
index 0000000..5333810
--- /dev/null
+++ b/vendor/react/http/src/Middleware/LimitConcurrentRequestsMiddleware.php
@@ -0,0 +1,211 @@
+<?php
+
+namespace React\Http\Middleware;
+
+use Psr\Http\Message\ResponseInterface;
+use Psr\Http\Message\ServerRequestInterface;
+use React\Http\Io\HttpBodyStream;
+use React\Http\Io\PauseBufferStream;
+use React\Promise;
+use React\Promise\PromiseInterface;
+use React\Promise\Deferred;
+use React\Stream\ReadableStreamInterface;
+
+/**
+ * Limits how many next handlers can be executed concurrently.
+ *
+ * If this middleware is invoked, it will check if the number of pending
+ * handlers is below the allowed limit and then simply invoke the next handler
+ * and it will return whatever the next handler returns (or throws).
+ *
+ * If the number of pending handlers exceeds the allowed limit, the request will
+ * be queued (and its streaming body will be paused) and it will return a pending
+ * promise.
+ * Once a pending handler returns (or throws), it will pick the oldest request
+ * from this queue and invokes the next handler (and its streaming body will be
+ * resumed).
+ *
+ * The following example shows how this middleware can be used to ensure no more
+ * than 10 handlers will be invoked at once:
+ *
+ * ```php
+ * $http = new React\Http\HttpServer(
+ * new React\Http\Middleware\StreamingRequestMiddleware(),
+ * new React\Http\Middleware\LimitConcurrentRequestsMiddleware(10),
+ * $handler
+ * );
+ * ```
+ *
+ * Similarly, this middleware is often used in combination with the
+ * [`RequestBodyBufferMiddleware`](#requestbodybuffermiddleware) (see below)
+ * to limit the total number of requests that can be buffered at once:
+ *
+ * ```php
+ * $http = new React\Http\HttpServer(
+ * new React\Http\Middleware\StreamingRequestMiddleware(),
+ * new React\Http\Middleware\LimitConcurrentRequestsMiddleware(100), // 100 concurrent buffering handlers
+ * new React\Http\Middleware\RequestBodyBufferMiddleware(2 * 1024 * 1024), // 2 MiB per request
+ * new React\Http\Middleware\RequestBodyParserMiddleware(),
+ * $handler
+ * );
+ * ```
+ *
+ * More sophisticated examples include limiting the total number of requests
+ * that can be buffered at once and then ensure the actual request handler only
+ * processes one request after another without any concurrency:
+ *
+ * ```php
+ * $http = new React\Http\HttpServer(
+ * new React\Http\Middleware\StreamingRequestMiddleware(),
+ * new React\Http\Middleware\LimitConcurrentRequestsMiddleware(100), // 100 concurrent buffering handlers
+ * new React\Http\Middleware\RequestBodyBufferMiddleware(2 * 1024 * 1024), // 2 MiB per request
+ * new React\Http\Middleware\RequestBodyParserMiddleware(),
+ * new React\Http\Middleware\LimitConcurrentRequestsMiddleware(1), // only execute 1 handler (no concurrency)
+ * $handler
+ * );
+ * ```
+ *
+ * @see RequestBodyBufferMiddleware
+ */
+final class LimitConcurrentRequestsMiddleware
+{
+ private $limit;
+ private $pending = 0;
+ private $queue = array();
+
+ /**
+ * @param int $limit Maximum amount of concurrent requests handled.
+ *
+ * For example when $limit is set to 10, 10 requests will flow to $next
+ * while more incoming requests have to wait until one is done.
+ */
+ public function __construct($limit)
+ {
+ $this->limit = $limit;
+ }
+
+ public function __invoke(ServerRequestInterface $request, $next)
+ {
+ // happy path: simply invoke next request handler if we're below limit
+ if ($this->pending < $this->limit) {
+ ++$this->pending;
+
+ try {
+ $response = $next($request);
+ } catch (\Exception $e) {
+ $this->processQueue();
+ throw $e;
+ } catch (\Throwable $e) { // @codeCoverageIgnoreStart
+ // handle Errors just like Exceptions (PHP 7+ only)
+ $this->processQueue();
+ throw $e; // @codeCoverageIgnoreEnd
+ }
+
+ // happy path: if next request handler returned immediately,
+ // we can simply try to invoke the next queued request
+ if ($response instanceof ResponseInterface) {
+ $this->processQueue();
+ return $response;
+ }
+
+ // if the next handler returns a pending promise, we have to
+ // await its resolution before invoking next queued request
+ return $this->await(Promise\resolve($response));
+ }
+
+ // if we reach this point, then this request will need to be queued
+ // check if the body is streaming, in which case we need to buffer everything
+ $body = $request->getBody();
+ if ($body instanceof ReadableStreamInterface) {
+ // pause actual body to stop emitting data until the handler is called
+ $size = $body->getSize();
+ $body = new PauseBufferStream($body);
+ $body->pauseImplicit();
+
+ // replace with buffering body to ensure any readable events will be buffered
+ $request = $request->withBody(new HttpBodyStream(
+ $body,
+ $size
+ ));
+ }
+
+ // get next queue position
+ $queue =& $this->queue;
+ $queue[] = null;
+ \end($queue);
+ $id = \key($queue);
+
+ $deferred = new Deferred(function ($_, $reject) use (&$queue, $id) {
+ // queued promise cancelled before its next handler is invoked
+ // remove from queue and reject explicitly
+ unset($queue[$id]);
+ $reject(new \RuntimeException('Cancelled queued next handler'));
+ });
+
+ // queue request and process queue if pending does not exceed limit
+ $queue[$id] = $deferred;
+
+ $pending = &$this->pending;
+ $that = $this;
+ return $deferred->promise()->then(function () use ($request, $next, $body, &$pending, $that) {
+ // invoke next request handler
+ ++$pending;
+
+ try {
+ $response = $next($request);
+ } catch (\Exception $e) {
+ $that->processQueue();
+ throw $e;
+ } catch (\Throwable $e) { // @codeCoverageIgnoreStart
+ // handle Errors just like Exceptions (PHP 7+ only)
+ $that->processQueue();
+ throw $e; // @codeCoverageIgnoreEnd
+ }
+
+ // resume readable stream and replay buffered events
+ if ($body instanceof PauseBufferStream) {
+ $body->resumeImplicit();
+ }
+
+ // if the next handler returns a pending promise, we have to
+ // await its resolution before invoking next queued request
+ return $that->await(Promise\resolve($response));
+ });
+ }
+
+ /**
+ * @internal
+ * @param PromiseInterface $promise
+ * @return PromiseInterface
+ */
+ public function await(PromiseInterface $promise)
+ {
+ $that = $this;
+
+ return $promise->then(function ($response) use ($that) {
+ $that->processQueue();
+
+ return $response;
+ }, function ($error) use ($that) {
+ $that->processQueue();
+
+ return Promise\reject($error);
+ });
+ }
+
+ /**
+ * @internal
+ */
+ public function processQueue()
+ {
+ // skip if we're still above concurrency limit or there's no queued request waiting
+ if (--$this->pending >= $this->limit || !$this->queue) {
+ return;
+ }
+
+ $first = \reset($this->queue);
+ unset($this->queue[key($this->queue)]);
+
+ $first->resolve();
+ }
+}