diff options
Diffstat (limited to 'vendor/react/promise-stream/src')
-rw-r--r-- | vendor/react/promise-stream/src/UnwrapReadableStream.php | 137 | ||||
-rw-r--r-- | vendor/react/promise-stream/src/UnwrapWritableStream.php | 164 | ||||
-rw-r--r-- | vendor/react/promise-stream/src/functions.php | 370 | ||||
-rw-r--r-- | vendor/react/promise-stream/src/functions_include.php | 7 |
4 files changed, 678 insertions, 0 deletions
diff --git a/vendor/react/promise-stream/src/UnwrapReadableStream.php b/vendor/react/promise-stream/src/UnwrapReadableStream.php new file mode 100644 index 0000000..acd23be --- /dev/null +++ b/vendor/react/promise-stream/src/UnwrapReadableStream.php @@ -0,0 +1,137 @@ +<?php + +namespace React\Promise\Stream; + +use Evenement\EventEmitter; +use InvalidArgumentException; +use React\Promise\CancellablePromiseInterface; +use React\Promise\PromiseInterface; +use React\Stream\ReadableStreamInterface; +use React\Stream\Util; +use React\Stream\WritableStreamInterface; + +/** + * @internal + * @see unwrapReadable() instead + */ +class UnwrapReadableStream extends EventEmitter implements ReadableStreamInterface +{ + private $promise; + private $closed = false; + + /** + * Instantiate new unwrapped readable stream for given `Promise` which resolves with a `ReadableStreamInterface`. + * + * @param PromiseInterface $promise Promise<ReadableStreamInterface, Exception> + */ + public function __construct(PromiseInterface $promise) + { + $out = $this; + $closed =& $this->closed; + + $this->promise = $promise->then( + function ($stream) { + if (!$stream instanceof ReadableStreamInterface) { + throw new InvalidArgumentException('Not a readable stream'); + } + return $stream; + } + )->then( + function (ReadableStreamInterface $stream) use ($out, &$closed) { + // stream is already closed, make sure to close output stream + if (!$stream->isReadable()) { + $out->close(); + return $stream; + } + + // resolves but output is already closed, make sure to close stream silently + if ($closed) { + $stream->close(); + return $stream; + } + + // stream any writes into output stream + $stream->on('data', function ($data) use ($out) { + $out->emit('data', array($data, $out)); + }); + + // forward end events and close + $stream->on('end', function () use ($out, &$closed) { + if (!$closed) { + $out->emit('end', array($out)); + $out->close(); + } + }); + + // error events cancel output stream + $stream->on('error', function ($error) use ($out) { + $out->emit('error', array($error, $out)); + $out->close(); + }); + + // close both streams once either side closes + $stream->on('close', array($out, 'close')); + $out->on('close', array($stream, 'close')); + + return $stream; + }, + function ($e) use ($out, &$closed) { + if (!$closed) { + $out->emit('error', array($e, $out)); + $out->close(); + } + + // resume() and pause() may attach to this promise, so ensure we actually reject here + throw $e; + } + ); + } + + public function isReadable() + { + return !$this->closed; + } + + public function pause() + { + if ($this->promise !== null) { + $this->promise->then(function (ReadableStreamInterface $stream) { + $stream->pause(); + }); + } + } + + public function resume() + { + if ($this->promise !== null) { + $this->promise->then(function (ReadableStreamInterface $stream) { + $stream->resume(); + }); + } + } + + public function pipe(WritableStreamInterface $dest, array $options = array()) + { + Util::pipe($this, $dest, $options); + + return $dest; + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->closed = true; + + // try to cancel promise once the stream closes + if ($this->promise instanceof CancellablePromiseInterface) { + $this->promise->cancel(); + } + $this->promise = null; + + $this->emit('close'); + $this->removeAllListeners(); + } +} diff --git a/vendor/react/promise-stream/src/UnwrapWritableStream.php b/vendor/react/promise-stream/src/UnwrapWritableStream.php new file mode 100644 index 0000000..f19e706 --- /dev/null +++ b/vendor/react/promise-stream/src/UnwrapWritableStream.php @@ -0,0 +1,164 @@ +<?php + +namespace React\Promise\Stream; + +use Evenement\EventEmitter; +use InvalidArgumentException; +use React\Promise\CancellablePromiseInterface; +use React\Promise\PromiseInterface; +use React\Stream\WritableStreamInterface; + +/** + * @internal + * @see unwrapWritable() instead + */ +class UnwrapWritableStream extends EventEmitter implements WritableStreamInterface +{ + private $promise; + private $stream; + private $buffer = array(); + private $closed = false; + private $ending = false; + + /** + * Instantiate new unwrapped writable stream for given `Promise` which resolves with a `WritableStreamInterface`. + * + * @param PromiseInterface $promise Promise<WritableStreamInterface, Exception> + */ + public function __construct(PromiseInterface $promise) + { + $out = $this; + $store =& $this->stream; + $buffer =& $this->buffer; + $ending =& $this->ending; + $closed =& $this->closed; + + $this->promise = $promise->then( + function ($stream) { + if (!$stream instanceof WritableStreamInterface) { + throw new InvalidArgumentException('Not a writable stream'); + } + return $stream; + } + )->then( + function (WritableStreamInterface $stream) use ($out, &$store, &$buffer, &$ending, &$closed) { + // stream is already closed, make sure to close output stream + if (!$stream->isWritable()) { + $out->close(); + return $stream; + } + + // resolves but output is already closed, make sure to close stream silently + if ($closed) { + $stream->close(); + return $stream; + } + + // forward drain events for back pressure + $stream->on('drain', function () use ($out) { + $out->emit('drain', array($out)); + }); + + // error events cancel output stream + $stream->on('error', function ($error) use ($out) { + $out->emit('error', array($error, $out)); + $out->close(); + }); + + // close both streams once either side closes + $stream->on('close', array($out, 'close')); + $out->on('close', array($stream, 'close')); + + if ($buffer) { + // flush buffer to stream and check if its buffer is not exceeded + $drained = true; + foreach ($buffer as $chunk) { + if (!$stream->write($chunk)) { + $drained = false; + } + } + $buffer = array(); + + if ($drained) { + // signal drain event, because the output stream previous signalled a full buffer + $out->emit('drain', array($out)); + } + } + + if ($ending) { + $stream->end(); + } else { + $store = $stream; + } + + return $stream; + }, + function ($e) use ($out, &$closed) { + if (!$closed) { + $out->emit('error', array($e, $out)); + $out->close(); + } + } + ); + } + + public function write($data) + { + if ($this->ending) { + return false; + } + + // forward to inner stream if possible + if ($this->stream !== null) { + return $this->stream->write($data); + } + + // append to buffer and signal the buffer is full + $this->buffer[] = $data; + return false; + } + + public function end($data = null) + { + if ($this->ending) { + return; + } + + $this->ending = true; + + // forward to inner stream if possible + if ($this->stream !== null) { + return $this->stream->end($data); + } + + // append to buffer + if ($data !== null) { + $this->buffer[] = $data; + } + } + + public function isWritable() + { + return !$this->ending; + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->buffer = array(); + $this->ending = true; + $this->closed = true; + + // try to cancel promise once the stream closes + if ($this->promise instanceof CancellablePromiseInterface) { + $this->promise->cancel(); + } + $this->promise = $this->stream = null; + + $this->emit('close'); + $this->removeAllListeners(); + } +} diff --git a/vendor/react/promise-stream/src/functions.php b/vendor/react/promise-stream/src/functions.php new file mode 100644 index 0000000..da66de8 --- /dev/null +++ b/vendor/react/promise-stream/src/functions.php @@ -0,0 +1,370 @@ +<?php + +namespace React\Promise\Stream; + +use Evenement\EventEmitterInterface; +use React\Promise; +use React\Promise\PromiseInterface; +use React\Stream\ReadableStreamInterface; +use React\Stream\WritableStreamInterface; + +/** + * Create a `Promise` which will be fulfilled with the stream data buffer. + * + * ```php + * $stream = accessSomeJsonStream(); + * + * React\Promise\Stream\buffer($stream)->then(function (string $contents) { + * var_dump(json_decode($contents)); + * }); + * ``` + * + * The promise will be fulfilled with a `string` of all data chunks concatenated once the stream closes. + * + * The promise will be fulfilled with an empty `string` if the stream is already closed. + * + * The promise will be rejected with a `RuntimeException` if the stream emits an error. + * + * The promise will be rejected with a `RuntimeException` if it is cancelled. + * + * The optional `$maxLength` argument defaults to no limit. In case the maximum + * length is given and the stream emits more data before the end, the promise + * will be rejected with an `OverflowException`. + * + * ```php + * $stream = accessSomeToLargeStream(); + * + * React\Promise\Stream\buffer($stream, 1024)->then(function ($contents) { + * var_dump(json_decode($contents)); + * }, function ($error) { + * // Reaching here when the stream buffer goes above the max size, + * // in this example that is 1024 bytes, + * // or when the stream emits an error. + * }); + * ``` + * + * @param ReadableStreamInterface<string> $stream + * @param ?int $maxLength Maximum number of bytes to buffer or null for unlimited. + * @return PromiseInterface<string,\RuntimeException> + */ +function buffer(ReadableStreamInterface $stream, $maxLength = null) +{ + // stream already ended => resolve with empty buffer + if (!$stream->isReadable()) { + return Promise\resolve(''); + } + + $buffer = ''; + + $promise = new Promise\Promise(function ($resolve, $reject) use ($stream, $maxLength, &$buffer, &$bufferer) { + $bufferer = function ($data) use (&$buffer, $reject, $maxLength) { + $buffer .= $data; + + if ($maxLength !== null && isset($buffer[$maxLength])) { + $reject(new \OverflowException('Buffer exceeded maximum length')); + } + }; + + $stream->on('data', $bufferer); + + $stream->on('error', function (\Exception $e) use ($reject) { + $reject(new \RuntimeException( + 'An error occured on the underlying stream while buffering: ' . $e->getMessage(), + $e->getCode(), + $e + )); + }); + + $stream->on('close', function () use ($resolve, &$buffer) { + $resolve($buffer); + }); + }, function ($_, $reject) { + $reject(new \RuntimeException('Cancelled buffering')); + }); + + return $promise->then(null, function (\Exception $error) use (&$buffer, $bufferer, $stream) { + // promise rejected => clear buffer and buffering + $buffer = ''; + $stream->removeListener('data', $bufferer); + + throw $error; + }); +} + +/** + * Create a `Promise` which will be fulfilled once the given event triggers for the first time. + * + * ```php + * $stream = accessSomeJsonStream(); + * + * React\Promise\Stream\first($stream)->then(function (string $chunk) { + * echo 'The first chunk arrived: ' . $chunk; + * }); + * ``` + * + * The promise will be fulfilled with a `mixed` value of whatever the first event + * emitted or `null` if the event does not pass any data. + * If you do not pass a custom event name, then it will wait for the first "data" + * event. + * For common streams of type `ReadableStreamInterface<string>`, this means it will be + * fulfilled with a `string` containing the first data chunk. + * + * The promise will be rejected with a `RuntimeException` if the stream emits an error + * – unless you're waiting for the "error" event, in which case it will be fulfilled. + * + * The promise will be rejected with a `RuntimeException` once the stream closes + * – unless you're waiting for the "close" event, in which case it will be fulfilled. + * + * The promise will be rejected with a `RuntimeException` if the stream is already closed. + * + * The promise will be rejected with a `RuntimeException` if it is cancelled. + * + * @param ReadableStreamInterface|WritableStreamInterface $stream + * @param string $event + * @return PromiseInterface<mixed,\RuntimeException> + */ +function first(EventEmitterInterface $stream, $event = 'data') +{ + if ($stream instanceof ReadableStreamInterface) { + // readable or duplex stream not readable => already closed + // a half-open duplex stream is considered closed if its readable side is closed + if (!$stream->isReadable()) { + return Promise\reject(new \RuntimeException('Stream already closed')); + } + } elseif ($stream instanceof WritableStreamInterface) { + // writable-only stream (not duplex) not writable => already closed + if (!$stream->isWritable()) { + return Promise\reject(new \RuntimeException('Stream already closed')); + } + } + + return new Promise\Promise(function ($resolve, $reject) use ($stream, $event, &$listener) { + $listener = function ($data = null) use ($stream, $event, &$listener, $resolve) { + $stream->removeListener($event, $listener); + $resolve($data); + }; + $stream->on($event, $listener); + + if ($event !== 'error') { + $stream->on('error', function (\Exception $e) use ($stream, $event, $listener, $reject) { + $stream->removeListener($event, $listener); + $reject(new \RuntimeException( + 'An error occured on the underlying stream while waiting for event: ' . $e->getMessage(), + $e->getCode(), + $e + )); + }); + } + + $stream->on('close', function () use ($stream, $event, $listener, $reject) { + $stream->removeListener($event, $listener); + $reject(new \RuntimeException('Stream closed')); + }); + }, function ($_, $reject) use ($stream, $event, &$listener) { + $stream->removeListener($event, $listener); + $reject(new \RuntimeException('Operation cancelled')); + }); +} + +/** + * Create a `Promise` which will be fulfilled with an array of all the event data. + * + * ```php + * $stream = accessSomeJsonStream(); + * + * React\Promise\Stream\all($stream)->then(function (array $chunks) { + * echo 'The stream consists of ' . count($chunks) . ' chunk(s)'; + * }); + * ``` + * + * The promise will be fulfilled with an `array` once the stream closes. The array + * will contain whatever all events emitted or `null` values if the events do not pass any data. + * If you do not pass a custom event name, then it will wait for all the "data" + * events. + * For common streams of type `ReadableStreamInterface<string>`, this means it will be + * fulfilled with a `string[]` array containing all the data chunk. + * + * The promise will be fulfilled with an empty `array` if the stream is already closed. + * + * The promise will be rejected with a `RuntimeException` if the stream emits an error. + * + * The promise will be rejected with a `RuntimeException` if it is cancelled. + * + * @param ReadableStreamInterface|WritableStreamInterface $stream + * @param string $event + * @return PromiseInterface<array,\RuntimeException> + */ +function all(EventEmitterInterface $stream, $event = 'data') +{ + // stream already ended => resolve with empty buffer + if ($stream instanceof ReadableStreamInterface) { + // readable or duplex stream not readable => already closed + // a half-open duplex stream is considered closed if its readable side is closed + if (!$stream->isReadable()) { + return Promise\resolve(array()); + } + } elseif ($stream instanceof WritableStreamInterface) { + // writable-only stream (not duplex) not writable => already closed + if (!$stream->isWritable()) { + return Promise\resolve(array()); + } + } + + $buffer = array(); + $bufferer = function ($data = null) use (&$buffer) { + $buffer []= $data; + }; + $stream->on($event, $bufferer); + + $promise = new Promise\Promise(function ($resolve, $reject) use ($stream, &$buffer) { + $stream->on('error', function (\Exception $e) use ($reject) { + $reject(new \RuntimeException( + 'An error occured on the underlying stream while buffering: ' . $e->getMessage(), + $e->getCode(), + $e + )); + }); + + $stream->on('close', function () use ($resolve, &$buffer) { + $resolve($buffer); + }); + }, function ($_, $reject) { + $reject(new \RuntimeException('Cancelled buffering')); + }); + + return $promise->then(null, function ($error) use (&$buffer, $bufferer, $stream, $event) { + // promise rejected => clear buffer and buffering + $buffer = array(); + $stream->removeListener($event, $bufferer); + + throw $error; + }); +} + +/** + * Unwrap a `Promise` which will be fulfilled with a `ReadableStreamInterface<T>`. + * + * This function returns a readable stream instance (implementing `ReadableStreamInterface<T>`) + * right away which acts as a proxy for the future promise resolution. + * Once the given Promise will be fulfilled with a `ReadableStreamInterface<T>`, its + * data will be piped to the output stream. + * + * ```php + * //$promise = someFunctionWhichResolvesWithAStream(); + * $promise = startDownloadStream($uri); + * + * $stream = React\Promise\Stream\unwrapReadable($promise); + * + * $stream->on('data', function (string $data) { + * echo $data; + * }); + * + * $stream->on('end', function () { + * echo 'DONE'; + * }); + * ``` + * + * If the given promise is either rejected or fulfilled with anything but an + * instance of `ReadableStreamInterface`, then the output stream will emit + * an `error` event and close: + * + * ```php + * $promise = startDownloadStream($invalidUri); + * + * $stream = React\Promise\Stream\unwrapReadable($promise); + * + * $stream->on('error', function (Exception $error) { + * echo 'Error: ' . $error->getMessage(); + * }); + * ``` + * + * The given `$promise` SHOULD be pending, i.e. it SHOULD NOT be fulfilled or rejected + * at the time of invoking this function. + * If the given promise is already settled and does not fulfill with an instance of + * `ReadableStreamInterface`, then you will not be able to receive the `error` event. + * + * You can `close()` the resulting stream at any time, which will either try to + * `cancel()` the pending promise or try to `close()` the underlying stream. + * + * ```php + * $promise = startDownloadStream($uri); + * + * $stream = React\Promise\Stream\unwrapReadable($promise); + * + * $loop->addTimer(2.0, function () use ($stream) { + * $stream->close(); + * }); + * ``` + * + * @param PromiseInterface<ReadableStreamInterface<T>,\Exception> $promise + * @return ReadableStreamInterface<T> + */ +function unwrapReadable(PromiseInterface $promise) +{ + return new UnwrapReadableStream($promise); +} + +/** + * unwrap a `Promise` which will be fulfilled with a `WritableStreamInterface<T>`. + * + * This function returns a writable stream instance (implementing `WritableStreamInterface<T>`) + * right away which acts as a proxy for the future promise resolution. + * Any writes to this instance will be buffered in memory for when the promise will + * be fulfilled. + * Once the given Promise will be fulfilled with a `WritableStreamInterface<T>`, any + * data you have written to the proxy will be forwarded transparently to the inner + * stream. + * + * ```php + * //$promise = someFunctionWhichResolvesWithAStream(); + * $promise = startUploadStream($uri); + * + * $stream = React\Promise\Stream\unwrapWritable($promise); + * + * $stream->write('hello'); + * $stream->end('world'); + * + * $stream->on('close', function () { + * echo 'DONE'; + * }); + * ``` + * + * If the given promise is either rejected or fulfilled with anything but an + * instance of `WritableStreamInterface`, then the output stream will emit + * an `error` event and close: + * + * ```php + * $promise = startUploadStream($invalidUri); + * + * $stream = React\Promise\Stream\unwrapWritable($promise); + * + * $stream->on('error', function (Exception $error) { + * echo 'Error: ' . $error->getMessage(); + * }); + * ``` + * + * The given `$promise` SHOULD be pending, i.e. it SHOULD NOT be fulfilled or rejected + * at the time of invoking this function. + * If the given promise is already settled and does not fulfill with an instance of + * `WritableStreamInterface`, then you will not be able to receive the `error` event. + * + * You can `close()` the resulting stream at any time, which will either try to + * `cancel()` the pending promise or try to `close()` the underlying stream. + * + * ```php + * $promise = startUploadStream($uri); + * + * $stream = React\Promise\Stream\unwrapWritable($promise); + * + * $loop->addTimer(2.0, function () use ($stream) { + * $stream->close(); + * }); + * ``` + * + * @param PromiseInterface<WritableStreamInterface<T>,\Exception> $promise + * @return WritableStreamInterface<T> + */ +function unwrapWritable(PromiseInterface $promise) +{ + return new UnwrapWritableStream($promise); +} diff --git a/vendor/react/promise-stream/src/functions_include.php b/vendor/react/promise-stream/src/functions_include.php new file mode 100644 index 0000000..768a4fd --- /dev/null +++ b/vendor/react/promise-stream/src/functions_include.php @@ -0,0 +1,7 @@ +<?php + +namespace React\Promise\Stream; + +if (!function_exists('React\Promise\Stream\buffer')) { + require __DIR__ . '/functions.php'; +} |