summaryrefslogtreecommitdiffstats
path: root/vendor/gipfl/protocol/src/Generic
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 12:44:51 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 12:44:51 +0000
commita1ec78bf0dc93d0e05e5f066f1949dc3baecea06 (patch)
treeee596ce1bc9840661386f96f9b8d1f919a106317 /vendor/gipfl/protocol/src/Generic
parentInitial commit. (diff)
downloadicingaweb2-module-incubator-a1ec78bf0dc93d0e05e5f066f1949dc3baecea06.tar.xz
icingaweb2-module-incubator-a1ec78bf0dc93d0e05e5f066f1949dc3baecea06.zip
Adding upstream version 0.20.0.upstream/0.20.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/gipfl/protocol/src/Generic')
-rw-r--r--vendor/gipfl/protocol/src/Generic/AbstractStreamWrapper.php139
1 files changed, 139 insertions, 0 deletions
diff --git a/vendor/gipfl/protocol/src/Generic/AbstractStreamWrapper.php b/vendor/gipfl/protocol/src/Generic/AbstractStreamWrapper.php
new file mode 100644
index 0000000..12f6e82
--- /dev/null
+++ b/vendor/gipfl/protocol/src/Generic/AbstractStreamWrapper.php
@@ -0,0 +1,139 @@
+<?php
+
+namespace gipfl\Protocol\Generic;
+
+use Evenement\EventEmitterTrait;
+use Exception;
+use React\Stream\DuplexStreamInterface;
+use React\Stream\ReadableStreamInterface;
+use React\Stream\Util;
+use React\Stream\WritableStreamInterface;
+use RuntimeException;
+
+abstract class AbstractStreamWrapper implements DuplexStreamInterface
+{
+ use EventEmitterTrait;
+
+ /** @var ReadableStreamInterface */
+ protected $input;
+
+ /** @var WritableStreamInterface */
+ protected $output;
+
+ private $closed = false;
+
+ public function __construct(ReadableStreamInterface $in, WritableStreamInterface $out = null)
+ {
+ $this->readFrom($in);
+ if ($out === null && $in instanceof WritableStreamInterface) {
+ $this->writeTo($in);
+ } else {
+ $this->writeTo($out);
+ }
+ }
+
+ abstract public function handleData($data);
+
+ protected function readFrom(ReadableStreamInterface $input)
+ {
+ $this->input = $input;
+ if (! $input->isReadable()) {
+ $this->close();
+ return;
+ }
+ $input->on('data', function ($data) {
+ $this->handleData($data);
+ });
+ $input->on('end', function () {
+ $this->handleEnd();
+ });
+ $input->on('close', function () {
+ $this->close();
+ });
+ $input->on('error', function (Exception $error) {
+ $this->handleError($error);
+ });
+ }
+
+ protected function writeTo(WritableStreamInterface $output)
+ {
+ $this->output = $output;
+ if (! $this->output->isWritable()) {
+ $this->close();
+ throw new RuntimeException('Cannot write to output');
+ }
+
+ $output->on('drain', function () {
+ $this->handleDrain();
+ });
+ $output->on('close', function () {
+ $this->close();
+ });
+ $output->on('error', function (Exception $error) {
+ $this->handleError($error);
+ });
+ }
+
+ protected function handleDrain()
+ {
+ $this->emit('drain');
+ }
+
+ protected function handleEnd()
+ {
+ if (! $this->closed) {
+ $this->emit('end');
+ $this->close();
+ }
+ }
+
+ public function isReadable()
+ {
+ return !$this->closed && $this->input->isReadable();
+ }
+
+ public function isWritable()
+ {
+ return !$this->closed && $this->output->isWritable();
+ }
+
+ public function close()
+ {
+ if ($this->closed) {
+ return;
+ }
+
+ $this->closed = true;
+ $this->input->close();
+ $this->output->close();
+
+ $this->emit('close');
+ $this->removeAllListeners();
+ }
+
+ public function pause()
+ {
+ $this->input->pause();
+ }
+
+ public function resume()
+ {
+ $this->input->resume();
+ }
+
+ public function pipe(WritableStreamInterface $dest, array $options = [])
+ {
+ Util::pipe($this, $dest, $options);
+
+ return $dest;
+ }
+
+ /**
+ * @param Exception $error
+ */
+ protected function handleError(Exception $error)
+ {
+ $this->emit('error', [$error]);
+ $this->close();
+ }
+}