summaryrefslogtreecommitdiffstats
path: root/vendor/clue/buzz-react/src/Message/ReadableBodyStream.php
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/clue/buzz-react/src/Message/ReadableBodyStream.php')
-rw-r--r--vendor/clue/buzz-react/src/Message/ReadableBodyStream.php153
1 files changed, 153 insertions, 0 deletions
diff --git a/vendor/clue/buzz-react/src/Message/ReadableBodyStream.php b/vendor/clue/buzz-react/src/Message/ReadableBodyStream.php
new file mode 100644
index 0000000..eac27f7
--- /dev/null
+++ b/vendor/clue/buzz-react/src/Message/ReadableBodyStream.php
@@ -0,0 +1,153 @@
+<?php
+
+namespace Clue\React\Buzz\Message;
+
+use Evenement\EventEmitter;
+use Psr\Http\Message\StreamInterface;
+use React\Stream\ReadableStreamInterface;
+use React\Stream\Util;
+use React\Stream\WritableStreamInterface;
+
+/**
+ * @internal
+ */
+class ReadableBodyStream extends EventEmitter implements ReadableStreamInterface, StreamInterface
+{
+ private $input;
+ private $position = 0;
+ private $size;
+ private $closed = false;
+
+ public function __construct(ReadableStreamInterface $input, $size = null)
+ {
+ $this->input = $input;
+ $this->size = $size;
+
+ $that = $this;
+ $pos =& $this->position;
+ $input->on('data', function ($data) use ($that, &$pos, $size) {
+ $that->emit('data', array($data));
+
+ $pos += \strlen($data);
+ if ($size !== null && $pos >= $size) {
+ $that->handleEnd();
+ }
+ });
+ $input->on('error', function ($error) use ($that) {
+ $that->emit('error', array($error));
+ $that->close();
+ });
+ $input->on('end', array($that, 'handleEnd'));
+ $input->on('close', array($that, 'close'));
+ }
+
+ public function close()
+ {
+ if (!$this->closed) {
+ $this->closed = true;
+ $this->input->close();
+
+ $this->emit('close');
+ $this->removeAllListeners();
+ }
+ }
+
+ public function isReadable()
+ {
+ return $this->input->isReadable();
+ }
+
+ public function pause()
+ {
+ $this->input->pause();
+ }
+
+ public function resume()
+ {
+ $this->input->resume();
+ }
+
+ public function pipe(WritableStreamInterface $dest, array $options = array())
+ {
+ Util::pipe($this, $dest, $options);
+
+ return $dest;
+ }
+
+ public function eof()
+ {
+ return !$this->isReadable();
+ }
+
+ public function __toString()
+ {
+ return '';
+ }
+
+ public function detach()
+ {
+ throw new \BadMethodCallException();
+ }
+
+ public function getSize()
+ {
+ return $this->size;
+ }
+
+ public function tell()
+ {
+ throw new \BadMethodCallException();
+ }
+
+ public function isSeekable()
+ {
+ return false;
+ }
+
+ public function seek($offset, $whence = SEEK_SET)
+ {
+ throw new \BadMethodCallException();
+ }
+
+ public function rewind()
+ {
+ throw new \BadMethodCallException();
+ }
+
+ public function isWritable()
+ {
+ return false;
+ }
+
+ public function write($string)
+ {
+ throw new \BadMethodCallException();
+ }
+
+ public function read($length)
+ {
+ throw new \BadMethodCallException();
+ }
+
+ public function getContents()
+ {
+ throw new \BadMethodCallException();
+ }
+
+ public function getMetadata($key = null)
+ {
+ return ($key === null) ? array() : null;
+ }
+
+ /** @internal */
+ public function handleEnd()
+ {
+ if ($this->position !== $this->size && $this->size !== null) {
+ $this->emit('error', array(new \UnderflowException('Unexpected end of response body after ' . $this->position . '/' . $this->size . ' bytes')));
+ } else {
+ $this->emit('end');
+ }
+
+ $this->close();
+ }
+}