limit = $limit; } public function __invoke(ServerRequestInterface $request, $next) { // happy path: simply invoke next request handler if we're below limit if ($this->pending < $this->limit) { ++$this->pending; try { $response = $next($request); } catch (\Exception $e) { $this->processQueue(); throw $e; } catch (\Throwable $e) { // @codeCoverageIgnoreStart // handle Errors just like Exceptions (PHP 7+ only) $this->processQueue(); throw $e; // @codeCoverageIgnoreEnd } // happy path: if next request handler returned immediately, // we can simply try to invoke the next queued request if ($response instanceof ResponseInterface) { $this->processQueue(); return $response; } // if the next handler returns a pending promise, we have to // await its resolution before invoking next queued request return $this->await(Promise\resolve($response)); } // if we reach this point, then this request will need to be queued // check if the body is streaming, in which case we need to buffer everything $body = $request->getBody(); if ($body instanceof ReadableStreamInterface) { // pause actual body to stop emitting data until the handler is called $size = $body->getSize(); $body = new PauseBufferStream($body); $body->pauseImplicit(); // replace with buffering body to ensure any readable events will be buffered $request = $request->withBody(new HttpBodyStream( $body, $size )); } // get next queue position $queue =& $this->queue; $queue[] = null; \end($queue); $id = \key($queue); $deferred = new Deferred(function ($_, $reject) use (&$queue, $id) { // queued promise cancelled before its next handler is invoked // remove from queue and reject explicitly unset($queue[$id]); $reject(new \RuntimeException('Cancelled queued next handler')); }); // queue request and process queue if pending does not exceed limit $queue[$id] = $deferred; $pending = &$this->pending; $that = $this; return $deferred->promise()->then(function () use ($request, $next, $body, &$pending, $that) { // invoke next request handler ++$pending; try { $response = $next($request); } catch (\Exception $e) { $that->processQueue(); throw $e; } catch (\Throwable $e) { // @codeCoverageIgnoreStart // handle Errors just like Exceptions (PHP 7+ only) $that->processQueue(); throw $e; // @codeCoverageIgnoreEnd } // resume readable stream and replay buffered events if ($body instanceof PauseBufferStream) { $body->resumeImplicit(); } // if the next handler returns a pending promise, we have to // await its resolution before invoking next queued request return $that->await(Promise\resolve($response)); }); } /** * @internal * @param PromiseInterface $promise * @return PromiseInterface */ public function await(PromiseInterface $promise) { $that = $this; return $promise->then(function ($response) use ($that) { $that->processQueue(); return $response; }, function ($error) use ($that) { $that->processQueue(); return Promise\reject($error); }); } /** * @internal */ public function processQueue() { // skip if we're still above concurrency limit or there's no queued request waiting if (--$this->pending >= $this->limit || !$this->queue) { return; } $first = \reset($this->queue); unset($this->queue[key($this->queue)]); $first->resolve(); } }