summaryrefslogtreecommitdiffstats
path: root/vendor/react/http/src/Io/StreamingServer.php
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/react/http/src/Io/StreamingServer.php')
-rw-r--r--vendor/react/http/src/Io/StreamingServer.php383
1 files changed, 383 insertions, 0 deletions
diff --git a/vendor/react/http/src/Io/StreamingServer.php b/vendor/react/http/src/Io/StreamingServer.php
new file mode 100644
index 0000000..7818f0b
--- /dev/null
+++ b/vendor/react/http/src/Io/StreamingServer.php
@@ -0,0 +1,383 @@
+<?php
+
+namespace React\Http\Io;
+
+use Evenement\EventEmitter;
+use Psr\Http\Message\ResponseInterface;
+use Psr\Http\Message\ServerRequestInterface;
+use React\EventLoop\LoopInterface;
+use React\Http\Message\Response;
+use React\Http\Message\ServerRequest;
+use React\Promise;
+use React\Promise\CancellablePromiseInterface;
+use React\Promise\PromiseInterface;
+use React\Socket\ConnectionInterface;
+use React\Socket\ServerInterface;
+use React\Stream\ReadableStreamInterface;
+use React\Stream\WritableStreamInterface;
+
+/**
+ * The internal `StreamingServer` class is responsible for handling incoming connections and then
+ * processing each incoming HTTP request.
+ *
+ * Unlike the [`HttpServer`](#httpserver) class, it does not buffer and parse the incoming
+ * HTTP request body by default. This means that the request handler will be
+ * invoked with a streaming request body. Once the request headers have been
+ * received, it will invoke the request handler function. This request handler
+ * function needs to be passed to the constructor and will be invoked with the
+ * respective [request](#request) object and expects a [response](#response)
+ * object in return:
+ *
+ * ```php
+ * $server = new StreamingServer($loop, function (ServerRequestInterface $request) {
+ * return new Response(
+ * Response::STATUS_OK,
+ * array(
+ * 'Content-Type' => 'text/plain'
+ * ),
+ * "Hello World!\n"
+ * );
+ * });
+ * ```
+ *
+ * Each incoming HTTP request message is always represented by the
+ * [PSR-7 `ServerRequestInterface`](https://www.php-fig.org/psr/psr-7/#321-psrhttpmessageserverrequestinterface),
+ * see also following [request](#request) chapter for more details.
+ * Each outgoing HTTP response message is always represented by the
+ * [PSR-7 `ResponseInterface`](https://www.php-fig.org/psr/psr-7/#33-psrhttpmessageresponseinterface),
+ * see also following [response](#response) chapter for more details.
+ *
+ * In order to process any connections, the server needs to be attached to an
+ * instance of `React\Socket\ServerInterface` through the [`listen()`](#listen) method
+ * as described in the following chapter. In its most simple form, you can attach
+ * this to a [`React\Socket\SocketServer`](https://github.com/reactphp/socket#socketserver)
+ * in order to start a plaintext HTTP server like this:
+ *
+ * ```php
+ * $server = new StreamingServer($loop, $handler);
+ *
+ * $socket = new React\Socket\SocketServer('0.0.0.0:8080', array(), $loop);
+ * $server->listen($socket);
+ * ```
+ *
+ * See also the [`listen()`](#listen) method and the [first example](examples) for more details.
+ *
+ * The `StreamingServer` class is considered advanced usage and unless you know
+ * what you're doing, you're recommended to use the [`HttpServer`](#httpserver) class
+ * instead. The `StreamingServer` class is specifically designed to help with
+ * more advanced use cases where you want to have full control over consuming
+ * the incoming HTTP request body and concurrency settings.
+ *
+ * In particular, this class does not buffer and parse the incoming HTTP request
+ * in memory. It will invoke the request handler function once the HTTP request
+ * headers have been received, i.e. before receiving the potentially much larger
+ * HTTP request body. This means the [request](#request) passed to your request
+ * handler function may not be fully compatible with PSR-7. See also
+ * [streaming request](#streaming-request) below for more details.
+ *
+ * @see \React\Http\HttpServer
+ * @see \React\Http\Message\Response
+ * @see self::listen()
+ * @internal
+ */
+final class StreamingServer extends EventEmitter
+{
+ private $callback;
+ private $parser;
+ private $loop;
+
+ /**
+ * Creates an HTTP server that invokes the given callback for each incoming HTTP request
+ *
+ * In order to process any connections, the server needs to be attached to an
+ * instance of `React\Socket\ServerInterface` which emits underlying streaming
+ * connections in order to then parse incoming data as HTTP.
+ * See also [listen()](#listen) for more details.
+ *
+ * @param LoopInterface $loop
+ * @param callable $requestHandler
+ * @see self::listen()
+ */
+ public function __construct(LoopInterface $loop, $requestHandler)
+ {
+ if (!\is_callable($requestHandler)) {
+ throw new \InvalidArgumentException('Invalid request handler given');
+ }
+
+ $this->loop = $loop;
+
+ $this->callback = $requestHandler;
+ $this->parser = new RequestHeaderParser();
+
+ $that = $this;
+ $this->parser->on('headers', function (ServerRequestInterface $request, ConnectionInterface $conn) use ($that) {
+ $that->handleRequest($conn, $request);
+ });
+
+ $this->parser->on('error', function(\Exception $e, ConnectionInterface $conn) use ($that) {
+ $that->emit('error', array($e));
+
+ // parsing failed => assume dummy request and send appropriate error
+ $that->writeError(
+ $conn,
+ $e->getCode() !== 0 ? $e->getCode() : Response::STATUS_BAD_REQUEST,
+ new ServerRequest('GET', '/')
+ );
+ });
+ }
+
+ /**
+ * Starts listening for HTTP requests on the given socket server instance
+ *
+ * @param ServerInterface $socket
+ * @see \React\Http\HttpServer::listen()
+ */
+ public function listen(ServerInterface $socket)
+ {
+ $socket->on('connection', array($this->parser, 'handle'));
+ }
+
+ /** @internal */
+ public function handleRequest(ConnectionInterface $conn, ServerRequestInterface $request)
+ {
+ if ($request->getProtocolVersion() !== '1.0' && '100-continue' === \strtolower($request->getHeaderLine('Expect'))) {
+ $conn->write("HTTP/1.1 100 Continue\r\n\r\n");
+ }
+
+ // execute request handler callback
+ $callback = $this->callback;
+ try {
+ $response = $callback($request);
+ } catch (\Exception $error) {
+ // request handler callback throws an Exception
+ $response = Promise\reject($error);
+ } catch (\Throwable $error) { // @codeCoverageIgnoreStart
+ // request handler callback throws a PHP7+ Error
+ $response = Promise\reject($error); // @codeCoverageIgnoreEnd
+ }
+
+ // cancel pending promise once connection closes
+ if ($response instanceof CancellablePromiseInterface) {
+ $conn->on('close', function () use ($response) {
+ $response->cancel();
+ });
+ }
+
+ // happy path: response returned, handle and return immediately
+ if ($response instanceof ResponseInterface) {
+ return $this->handleResponse($conn, $request, $response);
+ }
+
+ // did not return a promise? this is an error, convert into one for rejection below.
+ if (!$response instanceof PromiseInterface) {
+ $response = Promise\resolve($response);
+ }
+
+ $that = $this;
+ $response->then(
+ function ($response) use ($that, $conn, $request) {
+ if (!$response instanceof ResponseInterface) {
+ $message = 'The response callback is expected to resolve with an object implementing Psr\Http\Message\ResponseInterface, but resolved with "%s" instead.';
+ $message = \sprintf($message, \is_object($response) ? \get_class($response) : \gettype($response));
+ $exception = new \RuntimeException($message);
+
+ $that->emit('error', array($exception));
+ return $that->writeError($conn, Response::STATUS_INTERNAL_SERVER_ERROR, $request);
+ }
+ $that->handleResponse($conn, $request, $response);
+ },
+ function ($error) use ($that, $conn, $request) {
+ $message = 'The response callback is expected to resolve with an object implementing Psr\Http\Message\ResponseInterface, but rejected with "%s" instead.';
+ $message = \sprintf($message, \is_object($error) ? \get_class($error) : \gettype($error));
+
+ $previous = null;
+
+ if ($error instanceof \Throwable || $error instanceof \Exception) {
+ $previous = $error;
+ }
+
+ $exception = new \RuntimeException($message, 0, $previous);
+
+ $that->emit('error', array($exception));
+ return $that->writeError($conn, Response::STATUS_INTERNAL_SERVER_ERROR, $request);
+ }
+ );
+ }
+
+ /** @internal */
+ public function writeError(ConnectionInterface $conn, $code, ServerRequestInterface $request)
+ {
+ $response = new Response(
+ $code,
+ array(
+ 'Content-Type' => 'text/plain',
+ 'Connection' => 'close' // we do not want to keep the connection open after an error
+ ),
+ 'Error ' . $code
+ );
+
+ // append reason phrase to response body if known
+ $reason = $response->getReasonPhrase();
+ if ($reason !== '') {
+ $body = $response->getBody();
+ $body->seek(0, SEEK_END);
+ $body->write(': ' . $reason);
+ }
+
+ $this->handleResponse($conn, $request, $response);
+ }
+
+
+ /** @internal */
+ public function handleResponse(ConnectionInterface $connection, ServerRequestInterface $request, ResponseInterface $response)
+ {
+ // return early and close response body if connection is already closed
+ $body = $response->getBody();
+ if (!$connection->isWritable()) {
+ $body->close();
+ return;
+ }
+
+ $code = $response->getStatusCode();
+ $method = $request->getMethod();
+
+ // assign HTTP protocol version from request automatically
+ $version = $request->getProtocolVersion();
+ $response = $response->withProtocolVersion($version);
+
+ // assign default "Server" header automatically
+ if (!$response->hasHeader('Server')) {
+ $response = $response->withHeader('Server', 'ReactPHP/1');
+ } elseif ($response->getHeaderLine('Server') === ''){
+ $response = $response->withoutHeader('Server');
+ }
+
+ // assign default "Date" header from current time automatically
+ if (!$response->hasHeader('Date')) {
+ // IMF-fixdate = day-name "," SP date1 SP time-of-day SP GMT
+ $response = $response->withHeader('Date', gmdate('D, d M Y H:i:s') . ' GMT');
+ } elseif ($response->getHeaderLine('Date') === ''){
+ $response = $response->withoutHeader('Date');
+ }
+
+ // assign "Content-Length" header automatically
+ $chunked = false;
+ if (($method === 'CONNECT' && $code >= 200 && $code < 300) || ($code >= 100 && $code < 200) || $code === Response::STATUS_NO_CONTENT) {
+ // 2xx response to CONNECT and 1xx and 204 MUST NOT include Content-Length or Transfer-Encoding header
+ $response = $response->withoutHeader('Content-Length');
+ } elseif ($code === Response::STATUS_NOT_MODIFIED && ($response->hasHeader('Content-Length') || $body->getSize() === 0)) {
+ // 304 Not Modified: preserve explicit Content-Length and preserve missing header if body is empty
+ } elseif ($body->getSize() !== null) {
+ // assign Content-Length header when using a "normal" buffered body string
+ $response = $response->withHeader('Content-Length', (string)$body->getSize());
+ } elseif (!$response->hasHeader('Content-Length') && $version === '1.1') {
+ // assign chunked transfer-encoding if no 'content-length' is given for HTTP/1.1 responses
+ $chunked = true;
+ }
+
+ // assign "Transfer-Encoding" header automatically
+ if ($chunked) {
+ $response = $response->withHeader('Transfer-Encoding', 'chunked');
+ } else {
+ // remove any Transfer-Encoding headers unless automatically enabled above
+ $response = $response->withoutHeader('Transfer-Encoding');
+ }
+
+ // assign "Connection" header automatically
+ $persist = false;
+ if ($code === Response::STATUS_SWITCHING_PROTOCOLS) {
+ // 101 (Switching Protocols) response uses Connection: upgrade header
+ // This implies that this stream now uses another protocol and we
+ // may not persist this connection for additional requests.
+ $response = $response->withHeader('Connection', 'upgrade');
+ } elseif (\strtolower($request->getHeaderLine('Connection')) === 'close' || \strtolower($response->getHeaderLine('Connection')) === 'close') {
+ // obey explicit "Connection: close" request header or response header if present
+ $response = $response->withHeader('Connection', 'close');
+ } elseif ($version === '1.1') {
+ // HTTP/1.1 assumes persistent connection support by default, so we don't need to inform client
+ $persist = true;
+ } elseif (strtolower($request->getHeaderLine('Connection')) === 'keep-alive') {
+ // obey explicit "Connection: keep-alive" request header and inform client
+ $persist = true;
+ $response = $response->withHeader('Connection', 'keep-alive');
+ } else {
+ // remove any Connection headers unless automatically enabled above
+ $response = $response->withoutHeader('Connection');
+ }
+
+ // 101 (Switching Protocols) response (for Upgrade request) forwards upgraded data through duplex stream
+ // 2xx (Successful) response to CONNECT forwards tunneled application data through duplex stream
+ if (($code === Response::STATUS_SWITCHING_PROTOCOLS || ($method === 'CONNECT' && $code >= 200 && $code < 300)) && $body instanceof HttpBodyStream && $body->input instanceof WritableStreamInterface) {
+ if ($request->getBody()->isReadable()) {
+ // request is still streaming => wait for request close before forwarding following data from connection
+ $request->getBody()->on('close', function () use ($connection, $body) {
+ if ($body->input->isWritable()) {
+ $connection->pipe($body->input);
+ $connection->resume();
+ }
+ });
+ } elseif ($body->input->isWritable()) {
+ // request already closed => forward following data from connection
+ $connection->pipe($body->input);
+ $connection->resume();
+ }
+ }
+
+ // build HTTP response header by appending status line and header fields
+ $headers = "HTTP/" . $version . " " . $code . " " . $response->getReasonPhrase() . "\r\n";
+ foreach ($response->getHeaders() as $name => $values) {
+ foreach ($values as $value) {
+ $headers .= $name . ": " . $value . "\r\n";
+ }
+ }
+
+ // response to HEAD and 1xx, 204 and 304 responses MUST NOT include a body
+ // exclude status 101 (Switching Protocols) here for Upgrade request handling above
+ if ($method === 'HEAD' || ($code >= 100 && $code < 200 && $code !== Response::STATUS_SWITCHING_PROTOCOLS) || $code === Response::STATUS_NO_CONTENT || $code === Response::STATUS_NOT_MODIFIED) {
+ $body->close();
+ $body = '';
+ }
+
+ // this is a non-streaming response body or the body stream already closed?
+ if (!$body instanceof ReadableStreamInterface || !$body->isReadable()) {
+ // add final chunk if a streaming body is already closed and uses `Transfer-Encoding: chunked`
+ if ($body instanceof ReadableStreamInterface && $chunked) {
+ $body = "0\r\n\r\n";
+ }
+
+ // write response headers and body
+ $connection->write($headers . "\r\n" . $body);
+
+ // either wait for next request over persistent connection or end connection
+ if ($persist) {
+ $this->parser->handle($connection);
+ } else {
+ $connection->end();
+ }
+ return;
+ }
+
+ $connection->write($headers . "\r\n");
+
+ if ($chunked) {
+ $body = new ChunkedEncoder($body);
+ }
+
+ // Close response stream once connection closes.
+ // Note that this TCP/IP close detection may take some time,
+ // in particular this may only fire on a later read/write attempt.
+ $connection->on('close', array($body, 'close'));
+
+ // write streaming body and then wait for next request over persistent connection
+ if ($persist) {
+ $body->pipe($connection, array('end' => false));
+ $parser = $this->parser;
+ $body->on('end', function () use ($connection, $parser, $body) {
+ $connection->removeListener('close', array($body, 'close'));
+ $parser->handle($connection);
+ });
+ } else {
+ $body->pipe($connection);
+ }
+ }
+}