diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 12:44:51 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 12:44:51 +0000 |
commit | a1ec78bf0dc93d0e05e5f066f1949dc3baecea06 (patch) | |
tree | ee596ce1bc9840661386f96f9b8d1f919a106317 /vendor/gipfl/protocol/src/Generic | |
parent | Initial commit. (diff) | |
download | icingaweb2-module-incubator-a1ec78bf0dc93d0e05e5f066f1949dc3baecea06.tar.xz icingaweb2-module-incubator-a1ec78bf0dc93d0e05e5f066f1949dc3baecea06.zip |
Adding upstream version 0.20.0.upstream/0.20.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/gipfl/protocol/src/Generic')
-rw-r--r-- | vendor/gipfl/protocol/src/Generic/AbstractStreamWrapper.php | 139 |
1 files changed, 139 insertions, 0 deletions
diff --git a/vendor/gipfl/protocol/src/Generic/AbstractStreamWrapper.php b/vendor/gipfl/protocol/src/Generic/AbstractStreamWrapper.php new file mode 100644 index 0000000..12f6e82 --- /dev/null +++ b/vendor/gipfl/protocol/src/Generic/AbstractStreamWrapper.php @@ -0,0 +1,139 @@ +<?php + +namespace gipfl\Protocol\Generic; + +use Evenement\EventEmitterTrait; +use Exception; +use React\Stream\DuplexStreamInterface; +use React\Stream\ReadableStreamInterface; +use React\Stream\Util; +use React\Stream\WritableStreamInterface; +use RuntimeException; + +abstract class AbstractStreamWrapper implements DuplexStreamInterface +{ + use EventEmitterTrait; + + /** @var ReadableStreamInterface */ + protected $input; + + /** @var WritableStreamInterface */ + protected $output; + + private $closed = false; + + public function __construct(ReadableStreamInterface $in, WritableStreamInterface $out = null) + { + $this->readFrom($in); + if ($out === null && $in instanceof WritableStreamInterface) { + $this->writeTo($in); + } else { + $this->writeTo($out); + } + } + + abstract public function handleData($data); + + protected function readFrom(ReadableStreamInterface $input) + { + $this->input = $input; + if (! $input->isReadable()) { + $this->close(); + return; + } + $input->on('data', function ($data) { + $this->handleData($data); + }); + $input->on('end', function () { + $this->handleEnd(); + }); + $input->on('close', function () { + $this->close(); + }); + $input->on('error', function (Exception $error) { + $this->handleError($error); + }); + } + + protected function writeTo(WritableStreamInterface $output) + { + $this->output = $output; + if (! $this->output->isWritable()) { + $this->close(); + throw new RuntimeException('Cannot write to output'); + } + + $output->on('drain', function () { + $this->handleDrain(); + }); + $output->on('close', function () { + $this->close(); + }); + $output->on('error', function (Exception $error) { + $this->handleError($error); + }); + } + + protected function handleDrain() + { + $this->emit('drain'); + } + + protected function handleEnd() + { + if (! $this->closed) { + $this->emit('end'); + $this->close(); + } + } + + public function isReadable() + { + return !$this->closed && $this->input->isReadable(); + } + + public function isWritable() + { + return !$this->closed && $this->output->isWritable(); + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->closed = true; + $this->input->close(); + $this->output->close(); + + $this->emit('close'); + $this->removeAllListeners(); + } + + public function pause() + { + $this->input->pause(); + } + + public function resume() + { + $this->input->resume(); + } + + public function pipe(WritableStreamInterface $dest, array $options = []) + { + Util::pipe($this, $dest, $options); + + return $dest; + } + + /** + * @param Exception $error + */ + protected function handleError(Exception $error) + { + $this->emit('error', [$error]); + $this->close(); + } +} |