diff options
Diffstat (limited to 'vendor/react/promise-stream')
-rw-r--r-- | vendor/react/promise-stream/LICENSE | 21 | ||||
-rw-r--r-- | vendor/react/promise-stream/composer.json | 47 | ||||
-rw-r--r-- | vendor/react/promise-stream/src/UnwrapReadableStream.php | 137 | ||||
-rw-r--r-- | vendor/react/promise-stream/src/UnwrapWritableStream.php | 164 | ||||
-rw-r--r-- | vendor/react/promise-stream/src/functions.php | 370 | ||||
-rw-r--r-- | vendor/react/promise-stream/src/functions_include.php | 7 |
6 files changed, 746 insertions, 0 deletions
diff --git a/vendor/react/promise-stream/LICENSE b/vendor/react/promise-stream/LICENSE new file mode 100644 index 0000000..25e7071 --- /dev/null +++ b/vendor/react/promise-stream/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2016 Christian Lück, Cees-Jan Kiewiet, Jan Sorgalla, Chris Boden + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is furnished +to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/vendor/react/promise-stream/composer.json b/vendor/react/promise-stream/composer.json new file mode 100644 index 0000000..ee3972c --- /dev/null +++ b/vendor/react/promise-stream/composer.json @@ -0,0 +1,47 @@ +{ + "name": "react/promise-stream", + "description": "The missing link between Promise-land and Stream-land for ReactPHP", + "keywords": ["unwrap", "stream", "buffer", "promise", "ReactPHP", "async"], + "homepage": "https://github.com/reactphp/promise-stream", + "license": "MIT", + "authors": [ + { + "name": "Christian Lück", + "homepage": "https://clue.engineering/", + "email": "christian@clue.engineering" + }, + { + "name": "Cees-Jan Kiewiet", + "homepage": "https://wyrihaximus.net/", + "email": "reactphp@ceesjankiewiet.nl" + }, + { + "name": "Jan Sorgalla", + "homepage": "https://sorgalla.com/", + "email": "jsorgalla@gmail.com" + }, + { + "name": "Chris Boden", + "homepage": "https://cboden.dev/", + "email": "cboden@gmail.com" + } + ], + "autoload": { + "psr-4": { "React\\Promise\\Stream\\" : "src/" }, + "files": [ "src/functions_include.php" ] + }, + "autoload-dev": { + "psr-4": { "React\\Tests\\Promise\\Stream\\": "tests/" } + }, + "require": { + "php": ">=5.3", + "react/stream": "^1.0 || ^0.7 || ^0.6 || ^0.5 || ^0.4.6", + "react/promise": "^2.1 || ^1.2" + }, + "require-dev": { + "react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3", + "react/promise-timer": "^1.0", + "clue/block-react": "^1.0", + "phpunit/phpunit": "^9.3 || ^5.7 || ^4.8.35" + } +} diff --git a/vendor/react/promise-stream/src/UnwrapReadableStream.php b/vendor/react/promise-stream/src/UnwrapReadableStream.php new file mode 100644 index 0000000..acd23be --- /dev/null +++ b/vendor/react/promise-stream/src/UnwrapReadableStream.php @@ -0,0 +1,137 @@ +<?php + +namespace React\Promise\Stream; + +use Evenement\EventEmitter; +use InvalidArgumentException; +use React\Promise\CancellablePromiseInterface; +use React\Promise\PromiseInterface; +use React\Stream\ReadableStreamInterface; +use React\Stream\Util; +use React\Stream\WritableStreamInterface; + +/** + * @internal + * @see unwrapReadable() instead + */ +class UnwrapReadableStream extends EventEmitter implements ReadableStreamInterface +{ + private $promise; + private $closed = false; + + /** + * Instantiate new unwrapped readable stream for given `Promise` which resolves with a `ReadableStreamInterface`. + * + * @param PromiseInterface $promise Promise<ReadableStreamInterface, Exception> + */ + public function __construct(PromiseInterface $promise) + { + $out = $this; + $closed =& $this->closed; + + $this->promise = $promise->then( + function ($stream) { + if (!$stream instanceof ReadableStreamInterface) { + throw new InvalidArgumentException('Not a readable stream'); + } + return $stream; + } + )->then( + function (ReadableStreamInterface $stream) use ($out, &$closed) { + // stream is already closed, make sure to close output stream + if (!$stream->isReadable()) { + $out->close(); + return $stream; + } + + // resolves but output is already closed, make sure to close stream silently + if ($closed) { + $stream->close(); + return $stream; + } + + // stream any writes into output stream + $stream->on('data', function ($data) use ($out) { + $out->emit('data', array($data, $out)); + }); + + // forward end events and close + $stream->on('end', function () use ($out, &$closed) { + if (!$closed) { + $out->emit('end', array($out)); + $out->close(); + } + }); + + // error events cancel output stream + $stream->on('error', function ($error) use ($out) { + $out->emit('error', array($error, $out)); + $out->close(); + }); + + // close both streams once either side closes + $stream->on('close', array($out, 'close')); + $out->on('close', array($stream, 'close')); + + return $stream; + }, + function ($e) use ($out, &$closed) { + if (!$closed) { + $out->emit('error', array($e, $out)); + $out->close(); + } + + // resume() and pause() may attach to this promise, so ensure we actually reject here + throw $e; + } + ); + } + + public function isReadable() + { + return !$this->closed; + } + + public function pause() + { + if ($this->promise !== null) { + $this->promise->then(function (ReadableStreamInterface $stream) { + $stream->pause(); + }); + } + } + + public function resume() + { + if ($this->promise !== null) { + $this->promise->then(function (ReadableStreamInterface $stream) { + $stream->resume(); + }); + } + } + + public function pipe(WritableStreamInterface $dest, array $options = array()) + { + Util::pipe($this, $dest, $options); + + return $dest; + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->closed = true; + + // try to cancel promise once the stream closes + if ($this->promise instanceof CancellablePromiseInterface) { + $this->promise->cancel(); + } + $this->promise = null; + + $this->emit('close'); + $this->removeAllListeners(); + } +} diff --git a/vendor/react/promise-stream/src/UnwrapWritableStream.php b/vendor/react/promise-stream/src/UnwrapWritableStream.php new file mode 100644 index 0000000..f19e706 --- /dev/null +++ b/vendor/react/promise-stream/src/UnwrapWritableStream.php @@ -0,0 +1,164 @@ +<?php + +namespace React\Promise\Stream; + +use Evenement\EventEmitter; +use InvalidArgumentException; +use React\Promise\CancellablePromiseInterface; +use React\Promise\PromiseInterface; +use React\Stream\WritableStreamInterface; + +/** + * @internal + * @see unwrapWritable() instead + */ +class UnwrapWritableStream extends EventEmitter implements WritableStreamInterface +{ + private $promise; + private $stream; + private $buffer = array(); + private $closed = false; + private $ending = false; + + /** + * Instantiate new unwrapped writable stream for given `Promise` which resolves with a `WritableStreamInterface`. + * + * @param PromiseInterface $promise Promise<WritableStreamInterface, Exception> + */ + public function __construct(PromiseInterface $promise) + { + $out = $this; + $store =& $this->stream; + $buffer =& $this->buffer; + $ending =& $this->ending; + $closed =& $this->closed; + + $this->promise = $promise->then( + function ($stream) { + if (!$stream instanceof WritableStreamInterface) { + throw new InvalidArgumentException('Not a writable stream'); + } + return $stream; + } + )->then( + function (WritableStreamInterface $stream) use ($out, &$store, &$buffer, &$ending, &$closed) { + // stream is already closed, make sure to close output stream + if (!$stream->isWritable()) { + $out->close(); + return $stream; + } + + // resolves but output is already closed, make sure to close stream silently + if ($closed) { + $stream->close(); + return $stream; + } + + // forward drain events for back pressure + $stream->on('drain', function () use ($out) { + $out->emit('drain', array($out)); + }); + + // error events cancel output stream + $stream->on('error', function ($error) use ($out) { + $out->emit('error', array($error, $out)); + $out->close(); + }); + + // close both streams once either side closes + $stream->on('close', array($out, 'close')); + $out->on('close', array($stream, 'close')); + + if ($buffer) { + // flush buffer to stream and check if its buffer is not exceeded + $drained = true; + foreach ($buffer as $chunk) { + if (!$stream->write($chunk)) { + $drained = false; + } + } + $buffer = array(); + + if ($drained) { + // signal drain event, because the output stream previous signalled a full buffer + $out->emit('drain', array($out)); + } + } + + if ($ending) { + $stream->end(); + } else { + $store = $stream; + } + + return $stream; + }, + function ($e) use ($out, &$closed) { + if (!$closed) { + $out->emit('error', array($e, $out)); + $out->close(); + } + } + ); + } + + public function write($data) + { + if ($this->ending) { + return false; + } + + // forward to inner stream if possible + if ($this->stream !== null) { + return $this->stream->write($data); + } + + // append to buffer and signal the buffer is full + $this->buffer[] = $data; + return false; + } + + public function end($data = null) + { + if ($this->ending) { + return; + } + + $this->ending = true; + + // forward to inner stream if possible + if ($this->stream !== null) { + return $this->stream->end($data); + } + + // append to buffer + if ($data !== null) { + $this->buffer[] = $data; + } + } + + public function isWritable() + { + return !$this->ending; + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->buffer = array(); + $this->ending = true; + $this->closed = true; + + // try to cancel promise once the stream closes + if ($this->promise instanceof CancellablePromiseInterface) { + $this->promise->cancel(); + } + $this->promise = $this->stream = null; + + $this->emit('close'); + $this->removeAllListeners(); + } +} diff --git a/vendor/react/promise-stream/src/functions.php b/vendor/react/promise-stream/src/functions.php new file mode 100644 index 0000000..da66de8 --- /dev/null +++ b/vendor/react/promise-stream/src/functions.php @@ -0,0 +1,370 @@ +<?php + +namespace React\Promise\Stream; + +use Evenement\EventEmitterInterface; +use React\Promise; +use React\Promise\PromiseInterface; +use React\Stream\ReadableStreamInterface; +use React\Stream\WritableStreamInterface; + +/** + * Create a `Promise` which will be fulfilled with the stream data buffer. + * + * ```php + * $stream = accessSomeJsonStream(); + * + * React\Promise\Stream\buffer($stream)->then(function (string $contents) { + * var_dump(json_decode($contents)); + * }); + * ``` + * + * The promise will be fulfilled with a `string` of all data chunks concatenated once the stream closes. + * + * The promise will be fulfilled with an empty `string` if the stream is already closed. + * + * The promise will be rejected with a `RuntimeException` if the stream emits an error. + * + * The promise will be rejected with a `RuntimeException` if it is cancelled. + * + * The optional `$maxLength` argument defaults to no limit. In case the maximum + * length is given and the stream emits more data before the end, the promise + * will be rejected with an `OverflowException`. + * + * ```php + * $stream = accessSomeToLargeStream(); + * + * React\Promise\Stream\buffer($stream, 1024)->then(function ($contents) { + * var_dump(json_decode($contents)); + * }, function ($error) { + * // Reaching here when the stream buffer goes above the max size, + * // in this example that is 1024 bytes, + * // or when the stream emits an error. + * }); + * ``` + * + * @param ReadableStreamInterface<string> $stream + * @param ?int $maxLength Maximum number of bytes to buffer or null for unlimited. + * @return PromiseInterface<string,\RuntimeException> + */ +function buffer(ReadableStreamInterface $stream, $maxLength = null) +{ + // stream already ended => resolve with empty buffer + if (!$stream->isReadable()) { + return Promise\resolve(''); + } + + $buffer = ''; + + $promise = new Promise\Promise(function ($resolve, $reject) use ($stream, $maxLength, &$buffer, &$bufferer) { + $bufferer = function ($data) use (&$buffer, $reject, $maxLength) { + $buffer .= $data; + + if ($maxLength !== null && isset($buffer[$maxLength])) { + $reject(new \OverflowException('Buffer exceeded maximum length')); + } + }; + + $stream->on('data', $bufferer); + + $stream->on('error', function (\Exception $e) use ($reject) { + $reject(new \RuntimeException( + 'An error occured on the underlying stream while buffering: ' . $e->getMessage(), + $e->getCode(), + $e + )); + }); + + $stream->on('close', function () use ($resolve, &$buffer) { + $resolve($buffer); + }); + }, function ($_, $reject) { + $reject(new \RuntimeException('Cancelled buffering')); + }); + + return $promise->then(null, function (\Exception $error) use (&$buffer, $bufferer, $stream) { + // promise rejected => clear buffer and buffering + $buffer = ''; + $stream->removeListener('data', $bufferer); + + throw $error; + }); +} + +/** + * Create a `Promise` which will be fulfilled once the given event triggers for the first time. + * + * ```php + * $stream = accessSomeJsonStream(); + * + * React\Promise\Stream\first($stream)->then(function (string $chunk) { + * echo 'The first chunk arrived: ' . $chunk; + * }); + * ``` + * + * The promise will be fulfilled with a `mixed` value of whatever the first event + * emitted or `null` if the event does not pass any data. + * If you do not pass a custom event name, then it will wait for the first "data" + * event. + * For common streams of type `ReadableStreamInterface<string>`, this means it will be + * fulfilled with a `string` containing the first data chunk. + * + * The promise will be rejected with a `RuntimeException` if the stream emits an error + * – unless you're waiting for the "error" event, in which case it will be fulfilled. + * + * The promise will be rejected with a `RuntimeException` once the stream closes + * – unless you're waiting for the "close" event, in which case it will be fulfilled. + * + * The promise will be rejected with a `RuntimeException` if the stream is already closed. + * + * The promise will be rejected with a `RuntimeException` if it is cancelled. + * + * @param ReadableStreamInterface|WritableStreamInterface $stream + * @param string $event + * @return PromiseInterface<mixed,\RuntimeException> + */ +function first(EventEmitterInterface $stream, $event = 'data') +{ + if ($stream instanceof ReadableStreamInterface) { + // readable or duplex stream not readable => already closed + // a half-open duplex stream is considered closed if its readable side is closed + if (!$stream->isReadable()) { + return Promise\reject(new \RuntimeException('Stream already closed')); + } + } elseif ($stream instanceof WritableStreamInterface) { + // writable-only stream (not duplex) not writable => already closed + if (!$stream->isWritable()) { + return Promise\reject(new \RuntimeException('Stream already closed')); + } + } + + return new Promise\Promise(function ($resolve, $reject) use ($stream, $event, &$listener) { + $listener = function ($data = null) use ($stream, $event, &$listener, $resolve) { + $stream->removeListener($event, $listener); + $resolve($data); + }; + $stream->on($event, $listener); + + if ($event !== 'error') { + $stream->on('error', function (\Exception $e) use ($stream, $event, $listener, $reject) { + $stream->removeListener($event, $listener); + $reject(new \RuntimeException( + 'An error occured on the underlying stream while waiting for event: ' . $e->getMessage(), + $e->getCode(), + $e + )); + }); + } + + $stream->on('close', function () use ($stream, $event, $listener, $reject) { + $stream->removeListener($event, $listener); + $reject(new \RuntimeException('Stream closed')); + }); + }, function ($_, $reject) use ($stream, $event, &$listener) { + $stream->removeListener($event, $listener); + $reject(new \RuntimeException('Operation cancelled')); + }); +} + +/** + * Create a `Promise` which will be fulfilled with an array of all the event data. + * + * ```php + * $stream = accessSomeJsonStream(); + * + * React\Promise\Stream\all($stream)->then(function (array $chunks) { + * echo 'The stream consists of ' . count($chunks) . ' chunk(s)'; + * }); + * ``` + * + * The promise will be fulfilled with an `array` once the stream closes. The array + * will contain whatever all events emitted or `null` values if the events do not pass any data. + * If you do not pass a custom event name, then it will wait for all the "data" + * events. + * For common streams of type `ReadableStreamInterface<string>`, this means it will be + * fulfilled with a `string[]` array containing all the data chunk. + * + * The promise will be fulfilled with an empty `array` if the stream is already closed. + * + * The promise will be rejected with a `RuntimeException` if the stream emits an error. + * + * The promise will be rejected with a `RuntimeException` if it is cancelled. + * + * @param ReadableStreamInterface|WritableStreamInterface $stream + * @param string $event + * @return PromiseInterface<array,\RuntimeException> + */ +function all(EventEmitterInterface $stream, $event = 'data') +{ + // stream already ended => resolve with empty buffer + if ($stream instanceof ReadableStreamInterface) { + // readable or duplex stream not readable => already closed + // a half-open duplex stream is considered closed if its readable side is closed + if (!$stream->isReadable()) { + return Promise\resolve(array()); + } + } elseif ($stream instanceof WritableStreamInterface) { + // writable-only stream (not duplex) not writable => already closed + if (!$stream->isWritable()) { + return Promise\resolve(array()); + } + } + + $buffer = array(); + $bufferer = function ($data = null) use (&$buffer) { + $buffer []= $data; + }; + $stream->on($event, $bufferer); + + $promise = new Promise\Promise(function ($resolve, $reject) use ($stream, &$buffer) { + $stream->on('error', function (\Exception $e) use ($reject) { + $reject(new \RuntimeException( + 'An error occured on the underlying stream while buffering: ' . $e->getMessage(), + $e->getCode(), + $e + )); + }); + + $stream->on('close', function () use ($resolve, &$buffer) { + $resolve($buffer); + }); + }, function ($_, $reject) { + $reject(new \RuntimeException('Cancelled buffering')); + }); + + return $promise->then(null, function ($error) use (&$buffer, $bufferer, $stream, $event) { + // promise rejected => clear buffer and buffering + $buffer = array(); + $stream->removeListener($event, $bufferer); + + throw $error; + }); +} + +/** + * Unwrap a `Promise` which will be fulfilled with a `ReadableStreamInterface<T>`. + * + * This function returns a readable stream instance (implementing `ReadableStreamInterface<T>`) + * right away which acts as a proxy for the future promise resolution. + * Once the given Promise will be fulfilled with a `ReadableStreamInterface<T>`, its + * data will be piped to the output stream. + * + * ```php + * //$promise = someFunctionWhichResolvesWithAStream(); + * $promise = startDownloadStream($uri); + * + * $stream = React\Promise\Stream\unwrapReadable($promise); + * + * $stream->on('data', function (string $data) { + * echo $data; + * }); + * + * $stream->on('end', function () { + * echo 'DONE'; + * }); + * ``` + * + * If the given promise is either rejected or fulfilled with anything but an + * instance of `ReadableStreamInterface`, then the output stream will emit + * an `error` event and close: + * + * ```php + * $promise = startDownloadStream($invalidUri); + * + * $stream = React\Promise\Stream\unwrapReadable($promise); + * + * $stream->on('error', function (Exception $error) { + * echo 'Error: ' . $error->getMessage(); + * }); + * ``` + * + * The given `$promise` SHOULD be pending, i.e. it SHOULD NOT be fulfilled or rejected + * at the time of invoking this function. + * If the given promise is already settled and does not fulfill with an instance of + * `ReadableStreamInterface`, then you will not be able to receive the `error` event. + * + * You can `close()` the resulting stream at any time, which will either try to + * `cancel()` the pending promise or try to `close()` the underlying stream. + * + * ```php + * $promise = startDownloadStream($uri); + * + * $stream = React\Promise\Stream\unwrapReadable($promise); + * + * $loop->addTimer(2.0, function () use ($stream) { + * $stream->close(); + * }); + * ``` + * + * @param PromiseInterface<ReadableStreamInterface<T>,\Exception> $promise + * @return ReadableStreamInterface<T> + */ +function unwrapReadable(PromiseInterface $promise) +{ + return new UnwrapReadableStream($promise); +} + +/** + * unwrap a `Promise` which will be fulfilled with a `WritableStreamInterface<T>`. + * + * This function returns a writable stream instance (implementing `WritableStreamInterface<T>`) + * right away which acts as a proxy for the future promise resolution. + * Any writes to this instance will be buffered in memory for when the promise will + * be fulfilled. + * Once the given Promise will be fulfilled with a `WritableStreamInterface<T>`, any + * data you have written to the proxy will be forwarded transparently to the inner + * stream. + * + * ```php + * //$promise = someFunctionWhichResolvesWithAStream(); + * $promise = startUploadStream($uri); + * + * $stream = React\Promise\Stream\unwrapWritable($promise); + * + * $stream->write('hello'); + * $stream->end('world'); + * + * $stream->on('close', function () { + * echo 'DONE'; + * }); + * ``` + * + * If the given promise is either rejected or fulfilled with anything but an + * instance of `WritableStreamInterface`, then the output stream will emit + * an `error` event and close: + * + * ```php + * $promise = startUploadStream($invalidUri); + * + * $stream = React\Promise\Stream\unwrapWritable($promise); + * + * $stream->on('error', function (Exception $error) { + * echo 'Error: ' . $error->getMessage(); + * }); + * ``` + * + * The given `$promise` SHOULD be pending, i.e. it SHOULD NOT be fulfilled or rejected + * at the time of invoking this function. + * If the given promise is already settled and does not fulfill with an instance of + * `WritableStreamInterface`, then you will not be able to receive the `error` event. + * + * You can `close()` the resulting stream at any time, which will either try to + * `cancel()` the pending promise or try to `close()` the underlying stream. + * + * ```php + * $promise = startUploadStream($uri); + * + * $stream = React\Promise\Stream\unwrapWritable($promise); + * + * $loop->addTimer(2.0, function () use ($stream) { + * $stream->close(); + * }); + * ``` + * + * @param PromiseInterface<WritableStreamInterface<T>,\Exception> $promise + * @return WritableStreamInterface<T> + */ +function unwrapWritable(PromiseInterface $promise) +{ + return new UnwrapWritableStream($promise); +} diff --git a/vendor/react/promise-stream/src/functions_include.php b/vendor/react/promise-stream/src/functions_include.php new file mode 100644 index 0000000..768a4fd --- /dev/null +++ b/vendor/react/promise-stream/src/functions_include.php @@ -0,0 +1,7 @@ +<?php + +namespace React\Promise\Stream; + +if (!function_exists('React\Promise\Stream\buffer')) { + require __DIR__ . '/functions.php'; +} |