summaryrefslogtreecommitdiffstats
path: root/vendor/react/promise-stream
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/react/promise-stream')
-rw-r--r--vendor/react/promise-stream/LICENSE21
-rw-r--r--vendor/react/promise-stream/composer.json47
-rw-r--r--vendor/react/promise-stream/src/UnwrapReadableStream.php137
-rw-r--r--vendor/react/promise-stream/src/UnwrapWritableStream.php164
-rw-r--r--vendor/react/promise-stream/src/functions.php370
-rw-r--r--vendor/react/promise-stream/src/functions_include.php7
6 files changed, 746 insertions, 0 deletions
diff --git a/vendor/react/promise-stream/LICENSE b/vendor/react/promise-stream/LICENSE
new file mode 100644
index 0000000..25e7071
--- /dev/null
+++ b/vendor/react/promise-stream/LICENSE
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+
+Copyright (c) 2016 Christian Lück, Cees-Jan Kiewiet, Jan Sorgalla, Chris Boden
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is furnished
+to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/vendor/react/promise-stream/composer.json b/vendor/react/promise-stream/composer.json
new file mode 100644
index 0000000..ee3972c
--- /dev/null
+++ b/vendor/react/promise-stream/composer.json
@@ -0,0 +1,47 @@
+{
+ "name": "react/promise-stream",
+ "description": "The missing link between Promise-land and Stream-land for ReactPHP",
+ "keywords": ["unwrap", "stream", "buffer", "promise", "ReactPHP", "async"],
+ "homepage": "https://github.com/reactphp/promise-stream",
+ "license": "MIT",
+ "authors": [
+ {
+ "name": "Christian Lück",
+ "homepage": "https://clue.engineering/",
+ "email": "christian@clue.engineering"
+ },
+ {
+ "name": "Cees-Jan Kiewiet",
+ "homepage": "https://wyrihaximus.net/",
+ "email": "reactphp@ceesjankiewiet.nl"
+ },
+ {
+ "name": "Jan Sorgalla",
+ "homepage": "https://sorgalla.com/",
+ "email": "jsorgalla@gmail.com"
+ },
+ {
+ "name": "Chris Boden",
+ "homepage": "https://cboden.dev/",
+ "email": "cboden@gmail.com"
+ }
+ ],
+ "autoload": {
+ "psr-4": { "React\\Promise\\Stream\\" : "src/" },
+ "files": [ "src/functions_include.php" ]
+ },
+ "autoload-dev": {
+ "psr-4": { "React\\Tests\\Promise\\Stream\\": "tests/" }
+ },
+ "require": {
+ "php": ">=5.3",
+ "react/stream": "^1.0 || ^0.7 || ^0.6 || ^0.5 || ^0.4.6",
+ "react/promise": "^2.1 || ^1.2"
+ },
+ "require-dev": {
+ "react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3",
+ "react/promise-timer": "^1.0",
+ "clue/block-react": "^1.0",
+ "phpunit/phpunit": "^9.3 || ^5.7 || ^4.8.35"
+ }
+}
diff --git a/vendor/react/promise-stream/src/UnwrapReadableStream.php b/vendor/react/promise-stream/src/UnwrapReadableStream.php
new file mode 100644
index 0000000..acd23be
--- /dev/null
+++ b/vendor/react/promise-stream/src/UnwrapReadableStream.php
@@ -0,0 +1,137 @@
+<?php
+
+namespace React\Promise\Stream;
+
+use Evenement\EventEmitter;
+use InvalidArgumentException;
+use React\Promise\CancellablePromiseInterface;
+use React\Promise\PromiseInterface;
+use React\Stream\ReadableStreamInterface;
+use React\Stream\Util;
+use React\Stream\WritableStreamInterface;
+
+/**
+ * @internal
+ * @see unwrapReadable() instead
+ */
+class UnwrapReadableStream extends EventEmitter implements ReadableStreamInterface
+{
+ private $promise;
+ private $closed = false;
+
+ /**
+ * Instantiate new unwrapped readable stream for given `Promise` which resolves with a `ReadableStreamInterface`.
+ *
+ * @param PromiseInterface $promise Promise<ReadableStreamInterface, Exception>
+ */
+ public function __construct(PromiseInterface $promise)
+ {
+ $out = $this;
+ $closed =& $this->closed;
+
+ $this->promise = $promise->then(
+ function ($stream) {
+ if (!$stream instanceof ReadableStreamInterface) {
+ throw new InvalidArgumentException('Not a readable stream');
+ }
+ return $stream;
+ }
+ )->then(
+ function (ReadableStreamInterface $stream) use ($out, &$closed) {
+ // stream is already closed, make sure to close output stream
+ if (!$stream->isReadable()) {
+ $out->close();
+ return $stream;
+ }
+
+ // resolves but output is already closed, make sure to close stream silently
+ if ($closed) {
+ $stream->close();
+ return $stream;
+ }
+
+ // stream any writes into output stream
+ $stream->on('data', function ($data) use ($out) {
+ $out->emit('data', array($data, $out));
+ });
+
+ // forward end events and close
+ $stream->on('end', function () use ($out, &$closed) {
+ if (!$closed) {
+ $out->emit('end', array($out));
+ $out->close();
+ }
+ });
+
+ // error events cancel output stream
+ $stream->on('error', function ($error) use ($out) {
+ $out->emit('error', array($error, $out));
+ $out->close();
+ });
+
+ // close both streams once either side closes
+ $stream->on('close', array($out, 'close'));
+ $out->on('close', array($stream, 'close'));
+
+ return $stream;
+ },
+ function ($e) use ($out, &$closed) {
+ if (!$closed) {
+ $out->emit('error', array($e, $out));
+ $out->close();
+ }
+
+ // resume() and pause() may attach to this promise, so ensure we actually reject here
+ throw $e;
+ }
+ );
+ }
+
+ public function isReadable()
+ {
+ return !$this->closed;
+ }
+
+ public function pause()
+ {
+ if ($this->promise !== null) {
+ $this->promise->then(function (ReadableStreamInterface $stream) {
+ $stream->pause();
+ });
+ }
+ }
+
+ public function resume()
+ {
+ if ($this->promise !== null) {
+ $this->promise->then(function (ReadableStreamInterface $stream) {
+ $stream->resume();
+ });
+ }
+ }
+
+ public function pipe(WritableStreamInterface $dest, array $options = array())
+ {
+ Util::pipe($this, $dest, $options);
+
+ return $dest;
+ }
+
+ public function close()
+ {
+ if ($this->closed) {
+ return;
+ }
+
+ $this->closed = true;
+
+ // try to cancel promise once the stream closes
+ if ($this->promise instanceof CancellablePromiseInterface) {
+ $this->promise->cancel();
+ }
+ $this->promise = null;
+
+ $this->emit('close');
+ $this->removeAllListeners();
+ }
+}
diff --git a/vendor/react/promise-stream/src/UnwrapWritableStream.php b/vendor/react/promise-stream/src/UnwrapWritableStream.php
new file mode 100644
index 0000000..f19e706
--- /dev/null
+++ b/vendor/react/promise-stream/src/UnwrapWritableStream.php
@@ -0,0 +1,164 @@
+<?php
+
+namespace React\Promise\Stream;
+
+use Evenement\EventEmitter;
+use InvalidArgumentException;
+use React\Promise\CancellablePromiseInterface;
+use React\Promise\PromiseInterface;
+use React\Stream\WritableStreamInterface;
+
+/**
+ * @internal
+ * @see unwrapWritable() instead
+ */
+class UnwrapWritableStream extends EventEmitter implements WritableStreamInterface
+{
+ private $promise;
+ private $stream;
+ private $buffer = array();
+ private $closed = false;
+ private $ending = false;
+
+ /**
+ * Instantiate new unwrapped writable stream for given `Promise` which resolves with a `WritableStreamInterface`.
+ *
+ * @param PromiseInterface $promise Promise<WritableStreamInterface, Exception>
+ */
+ public function __construct(PromiseInterface $promise)
+ {
+ $out = $this;
+ $store =& $this->stream;
+ $buffer =& $this->buffer;
+ $ending =& $this->ending;
+ $closed =& $this->closed;
+
+ $this->promise = $promise->then(
+ function ($stream) {
+ if (!$stream instanceof WritableStreamInterface) {
+ throw new InvalidArgumentException('Not a writable stream');
+ }
+ return $stream;
+ }
+ )->then(
+ function (WritableStreamInterface $stream) use ($out, &$store, &$buffer, &$ending, &$closed) {
+ // stream is already closed, make sure to close output stream
+ if (!$stream->isWritable()) {
+ $out->close();
+ return $stream;
+ }
+
+ // resolves but output is already closed, make sure to close stream silently
+ if ($closed) {
+ $stream->close();
+ return $stream;
+ }
+
+ // forward drain events for back pressure
+ $stream->on('drain', function () use ($out) {
+ $out->emit('drain', array($out));
+ });
+
+ // error events cancel output stream
+ $stream->on('error', function ($error) use ($out) {
+ $out->emit('error', array($error, $out));
+ $out->close();
+ });
+
+ // close both streams once either side closes
+ $stream->on('close', array($out, 'close'));
+ $out->on('close', array($stream, 'close'));
+
+ if ($buffer) {
+ // flush buffer to stream and check if its buffer is not exceeded
+ $drained = true;
+ foreach ($buffer as $chunk) {
+ if (!$stream->write($chunk)) {
+ $drained = false;
+ }
+ }
+ $buffer = array();
+
+ if ($drained) {
+ // signal drain event, because the output stream previous signalled a full buffer
+ $out->emit('drain', array($out));
+ }
+ }
+
+ if ($ending) {
+ $stream->end();
+ } else {
+ $store = $stream;
+ }
+
+ return $stream;
+ },
+ function ($e) use ($out, &$closed) {
+ if (!$closed) {
+ $out->emit('error', array($e, $out));
+ $out->close();
+ }
+ }
+ );
+ }
+
+ public function write($data)
+ {
+ if ($this->ending) {
+ return false;
+ }
+
+ // forward to inner stream if possible
+ if ($this->stream !== null) {
+ return $this->stream->write($data);
+ }
+
+ // append to buffer and signal the buffer is full
+ $this->buffer[] = $data;
+ return false;
+ }
+
+ public function end($data = null)
+ {
+ if ($this->ending) {
+ return;
+ }
+
+ $this->ending = true;
+
+ // forward to inner stream if possible
+ if ($this->stream !== null) {
+ return $this->stream->end($data);
+ }
+
+ // append to buffer
+ if ($data !== null) {
+ $this->buffer[] = $data;
+ }
+ }
+
+ public function isWritable()
+ {
+ return !$this->ending;
+ }
+
+ public function close()
+ {
+ if ($this->closed) {
+ return;
+ }
+
+ $this->buffer = array();
+ $this->ending = true;
+ $this->closed = true;
+
+ // try to cancel promise once the stream closes
+ if ($this->promise instanceof CancellablePromiseInterface) {
+ $this->promise->cancel();
+ }
+ $this->promise = $this->stream = null;
+
+ $this->emit('close');
+ $this->removeAllListeners();
+ }
+}
diff --git a/vendor/react/promise-stream/src/functions.php b/vendor/react/promise-stream/src/functions.php
new file mode 100644
index 0000000..da66de8
--- /dev/null
+++ b/vendor/react/promise-stream/src/functions.php
@@ -0,0 +1,370 @@
+<?php
+
+namespace React\Promise\Stream;
+
+use Evenement\EventEmitterInterface;
+use React\Promise;
+use React\Promise\PromiseInterface;
+use React\Stream\ReadableStreamInterface;
+use React\Stream\WritableStreamInterface;
+
+/**
+ * Create a `Promise` which will be fulfilled with the stream data buffer.
+ *
+ * ```php
+ * $stream = accessSomeJsonStream();
+ *
+ * React\Promise\Stream\buffer($stream)->then(function (string $contents) {
+ * var_dump(json_decode($contents));
+ * });
+ * ```
+ *
+ * The promise will be fulfilled with a `string` of all data chunks concatenated once the stream closes.
+ *
+ * The promise will be fulfilled with an empty `string` if the stream is already closed.
+ *
+ * The promise will be rejected with a `RuntimeException` if the stream emits an error.
+ *
+ * The promise will be rejected with a `RuntimeException` if it is cancelled.
+ *
+ * The optional `$maxLength` argument defaults to no limit. In case the maximum
+ * length is given and the stream emits more data before the end, the promise
+ * will be rejected with an `OverflowException`.
+ *
+ * ```php
+ * $stream = accessSomeToLargeStream();
+ *
+ * React\Promise\Stream\buffer($stream, 1024)->then(function ($contents) {
+ * var_dump(json_decode($contents));
+ * }, function ($error) {
+ * // Reaching here when the stream buffer goes above the max size,
+ * // in this example that is 1024 bytes,
+ * // or when the stream emits an error.
+ * });
+ * ```
+ *
+ * @param ReadableStreamInterface<string> $stream
+ * @param ?int $maxLength Maximum number of bytes to buffer or null for unlimited.
+ * @return PromiseInterface<string,\RuntimeException>
+ */
+function buffer(ReadableStreamInterface $stream, $maxLength = null)
+{
+ // stream already ended => resolve with empty buffer
+ if (!$stream->isReadable()) {
+ return Promise\resolve('');
+ }
+
+ $buffer = '';
+
+ $promise = new Promise\Promise(function ($resolve, $reject) use ($stream, $maxLength, &$buffer, &$bufferer) {
+ $bufferer = function ($data) use (&$buffer, $reject, $maxLength) {
+ $buffer .= $data;
+
+ if ($maxLength !== null && isset($buffer[$maxLength])) {
+ $reject(new \OverflowException('Buffer exceeded maximum length'));
+ }
+ };
+
+ $stream->on('data', $bufferer);
+
+ $stream->on('error', function (\Exception $e) use ($reject) {
+ $reject(new \RuntimeException(
+ 'An error occured on the underlying stream while buffering: ' . $e->getMessage(),
+ $e->getCode(),
+ $e
+ ));
+ });
+
+ $stream->on('close', function () use ($resolve, &$buffer) {
+ $resolve($buffer);
+ });
+ }, function ($_, $reject) {
+ $reject(new \RuntimeException('Cancelled buffering'));
+ });
+
+ return $promise->then(null, function (\Exception $error) use (&$buffer, $bufferer, $stream) {
+ // promise rejected => clear buffer and buffering
+ $buffer = '';
+ $stream->removeListener('data', $bufferer);
+
+ throw $error;
+ });
+}
+
+/**
+ * Create a `Promise` which will be fulfilled once the given event triggers for the first time.
+ *
+ * ```php
+ * $stream = accessSomeJsonStream();
+ *
+ * React\Promise\Stream\first($stream)->then(function (string $chunk) {
+ * echo 'The first chunk arrived: ' . $chunk;
+ * });
+ * ```
+ *
+ * The promise will be fulfilled with a `mixed` value of whatever the first event
+ * emitted or `null` if the event does not pass any data.
+ * If you do not pass a custom event name, then it will wait for the first "data"
+ * event.
+ * For common streams of type `ReadableStreamInterface<string>`, this means it will be
+ * fulfilled with a `string` containing the first data chunk.
+ *
+ * The promise will be rejected with a `RuntimeException` if the stream emits an error
+ * – unless you're waiting for the "error" event, in which case it will be fulfilled.
+ *
+ * The promise will be rejected with a `RuntimeException` once the stream closes
+ * – unless you're waiting for the "close" event, in which case it will be fulfilled.
+ *
+ * The promise will be rejected with a `RuntimeException` if the stream is already closed.
+ *
+ * The promise will be rejected with a `RuntimeException` if it is cancelled.
+ *
+ * @param ReadableStreamInterface|WritableStreamInterface $stream
+ * @param string $event
+ * @return PromiseInterface<mixed,\RuntimeException>
+ */
+function first(EventEmitterInterface $stream, $event = 'data')
+{
+ if ($stream instanceof ReadableStreamInterface) {
+ // readable or duplex stream not readable => already closed
+ // a half-open duplex stream is considered closed if its readable side is closed
+ if (!$stream->isReadable()) {
+ return Promise\reject(new \RuntimeException('Stream already closed'));
+ }
+ } elseif ($stream instanceof WritableStreamInterface) {
+ // writable-only stream (not duplex) not writable => already closed
+ if (!$stream->isWritable()) {
+ return Promise\reject(new \RuntimeException('Stream already closed'));
+ }
+ }
+
+ return new Promise\Promise(function ($resolve, $reject) use ($stream, $event, &$listener) {
+ $listener = function ($data = null) use ($stream, $event, &$listener, $resolve) {
+ $stream->removeListener($event, $listener);
+ $resolve($data);
+ };
+ $stream->on($event, $listener);
+
+ if ($event !== 'error') {
+ $stream->on('error', function (\Exception $e) use ($stream, $event, $listener, $reject) {
+ $stream->removeListener($event, $listener);
+ $reject(new \RuntimeException(
+ 'An error occured on the underlying stream while waiting for event: ' . $e->getMessage(),
+ $e->getCode(),
+ $e
+ ));
+ });
+ }
+
+ $stream->on('close', function () use ($stream, $event, $listener, $reject) {
+ $stream->removeListener($event, $listener);
+ $reject(new \RuntimeException('Stream closed'));
+ });
+ }, function ($_, $reject) use ($stream, $event, &$listener) {
+ $stream->removeListener($event, $listener);
+ $reject(new \RuntimeException('Operation cancelled'));
+ });
+}
+
+/**
+ * Create a `Promise` which will be fulfilled with an array of all the event data.
+ *
+ * ```php
+ * $stream = accessSomeJsonStream();
+ *
+ * React\Promise\Stream\all($stream)->then(function (array $chunks) {
+ * echo 'The stream consists of ' . count($chunks) . ' chunk(s)';
+ * });
+ * ```
+ *
+ * The promise will be fulfilled with an `array` once the stream closes. The array
+ * will contain whatever all events emitted or `null` values if the events do not pass any data.
+ * If you do not pass a custom event name, then it will wait for all the "data"
+ * events.
+ * For common streams of type `ReadableStreamInterface<string>`, this means it will be
+ * fulfilled with a `string[]` array containing all the data chunk.
+ *
+ * The promise will be fulfilled with an empty `array` if the stream is already closed.
+ *
+ * The promise will be rejected with a `RuntimeException` if the stream emits an error.
+ *
+ * The promise will be rejected with a `RuntimeException` if it is cancelled.
+ *
+ * @param ReadableStreamInterface|WritableStreamInterface $stream
+ * @param string $event
+ * @return PromiseInterface<array,\RuntimeException>
+ */
+function all(EventEmitterInterface $stream, $event = 'data')
+{
+ // stream already ended => resolve with empty buffer
+ if ($stream instanceof ReadableStreamInterface) {
+ // readable or duplex stream not readable => already closed
+ // a half-open duplex stream is considered closed if its readable side is closed
+ if (!$stream->isReadable()) {
+ return Promise\resolve(array());
+ }
+ } elseif ($stream instanceof WritableStreamInterface) {
+ // writable-only stream (not duplex) not writable => already closed
+ if (!$stream->isWritable()) {
+ return Promise\resolve(array());
+ }
+ }
+
+ $buffer = array();
+ $bufferer = function ($data = null) use (&$buffer) {
+ $buffer []= $data;
+ };
+ $stream->on($event, $bufferer);
+
+ $promise = new Promise\Promise(function ($resolve, $reject) use ($stream, &$buffer) {
+ $stream->on('error', function (\Exception $e) use ($reject) {
+ $reject(new \RuntimeException(
+ 'An error occured on the underlying stream while buffering: ' . $e->getMessage(),
+ $e->getCode(),
+ $e
+ ));
+ });
+
+ $stream->on('close', function () use ($resolve, &$buffer) {
+ $resolve($buffer);
+ });
+ }, function ($_, $reject) {
+ $reject(new \RuntimeException('Cancelled buffering'));
+ });
+
+ return $promise->then(null, function ($error) use (&$buffer, $bufferer, $stream, $event) {
+ // promise rejected => clear buffer and buffering
+ $buffer = array();
+ $stream->removeListener($event, $bufferer);
+
+ throw $error;
+ });
+}
+
+/**
+ * Unwrap a `Promise` which will be fulfilled with a `ReadableStreamInterface<T>`.
+ *
+ * This function returns a readable stream instance (implementing `ReadableStreamInterface<T>`)
+ * right away which acts as a proxy for the future promise resolution.
+ * Once the given Promise will be fulfilled with a `ReadableStreamInterface<T>`, its
+ * data will be piped to the output stream.
+ *
+ * ```php
+ * //$promise = someFunctionWhichResolvesWithAStream();
+ * $promise = startDownloadStream($uri);
+ *
+ * $stream = React\Promise\Stream\unwrapReadable($promise);
+ *
+ * $stream->on('data', function (string $data) {
+ * echo $data;
+ * });
+ *
+ * $stream->on('end', function () {
+ * echo 'DONE';
+ * });
+ * ```
+ *
+ * If the given promise is either rejected or fulfilled with anything but an
+ * instance of `ReadableStreamInterface`, then the output stream will emit
+ * an `error` event and close:
+ *
+ * ```php
+ * $promise = startDownloadStream($invalidUri);
+ *
+ * $stream = React\Promise\Stream\unwrapReadable($promise);
+ *
+ * $stream->on('error', function (Exception $error) {
+ * echo 'Error: ' . $error->getMessage();
+ * });
+ * ```
+ *
+ * The given `$promise` SHOULD be pending, i.e. it SHOULD NOT be fulfilled or rejected
+ * at the time of invoking this function.
+ * If the given promise is already settled and does not fulfill with an instance of
+ * `ReadableStreamInterface`, then you will not be able to receive the `error` event.
+ *
+ * You can `close()` the resulting stream at any time, which will either try to
+ * `cancel()` the pending promise or try to `close()` the underlying stream.
+ *
+ * ```php
+ * $promise = startDownloadStream($uri);
+ *
+ * $stream = React\Promise\Stream\unwrapReadable($promise);
+ *
+ * $loop->addTimer(2.0, function () use ($stream) {
+ * $stream->close();
+ * });
+ * ```
+ *
+ * @param PromiseInterface<ReadableStreamInterface<T>,\Exception> $promise
+ * @return ReadableStreamInterface<T>
+ */
+function unwrapReadable(PromiseInterface $promise)
+{
+ return new UnwrapReadableStream($promise);
+}
+
+/**
+ * unwrap a `Promise` which will be fulfilled with a `WritableStreamInterface<T>`.
+ *
+ * This function returns a writable stream instance (implementing `WritableStreamInterface<T>`)
+ * right away which acts as a proxy for the future promise resolution.
+ * Any writes to this instance will be buffered in memory for when the promise will
+ * be fulfilled.
+ * Once the given Promise will be fulfilled with a `WritableStreamInterface<T>`, any
+ * data you have written to the proxy will be forwarded transparently to the inner
+ * stream.
+ *
+ * ```php
+ * //$promise = someFunctionWhichResolvesWithAStream();
+ * $promise = startUploadStream($uri);
+ *
+ * $stream = React\Promise\Stream\unwrapWritable($promise);
+ *
+ * $stream->write('hello');
+ * $stream->end('world');
+ *
+ * $stream->on('close', function () {
+ * echo 'DONE';
+ * });
+ * ```
+ *
+ * If the given promise is either rejected or fulfilled with anything but an
+ * instance of `WritableStreamInterface`, then the output stream will emit
+ * an `error` event and close:
+ *
+ * ```php
+ * $promise = startUploadStream($invalidUri);
+ *
+ * $stream = React\Promise\Stream\unwrapWritable($promise);
+ *
+ * $stream->on('error', function (Exception $error) {
+ * echo 'Error: ' . $error->getMessage();
+ * });
+ * ```
+ *
+ * The given `$promise` SHOULD be pending, i.e. it SHOULD NOT be fulfilled or rejected
+ * at the time of invoking this function.
+ * If the given promise is already settled and does not fulfill with an instance of
+ * `WritableStreamInterface`, then you will not be able to receive the `error` event.
+ *
+ * You can `close()` the resulting stream at any time, which will either try to
+ * `cancel()` the pending promise or try to `close()` the underlying stream.
+ *
+ * ```php
+ * $promise = startUploadStream($uri);
+ *
+ * $stream = React\Promise\Stream\unwrapWritable($promise);
+ *
+ * $loop->addTimer(2.0, function () use ($stream) {
+ * $stream->close();
+ * });
+ * ```
+ *
+ * @param PromiseInterface<WritableStreamInterface<T>,\Exception> $promise
+ * @return WritableStreamInterface<T>
+ */
+function unwrapWritable(PromiseInterface $promise)
+{
+ return new UnwrapWritableStream($promise);
+}
diff --git a/vendor/react/promise-stream/src/functions_include.php b/vendor/react/promise-stream/src/functions_include.php
new file mode 100644
index 0000000..768a4fd
--- /dev/null
+++ b/vendor/react/promise-stream/src/functions_include.php
@@ -0,0 +1,7 @@
+<?php
+
+namespace React\Promise\Stream;
+
+if (!function_exists('React\Promise\Stream\buffer')) {
+ require __DIR__ . '/functions.php';
+}