From a1ec78bf0dc93d0e05e5f066f1949dc3baecea06 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 28 Apr 2024 14:44:51 +0200 Subject: Adding upstream version 0.20.0. Signed-off-by: Daniel Baumann --- .../protocol/src/Generic/AbstractStreamWrapper.php | 139 +++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 vendor/gipfl/protocol/src/Generic/AbstractStreamWrapper.php (limited to 'vendor/gipfl/protocol/src/Generic') diff --git a/vendor/gipfl/protocol/src/Generic/AbstractStreamWrapper.php b/vendor/gipfl/protocol/src/Generic/AbstractStreamWrapper.php new file mode 100644 index 0000000..12f6e82 --- /dev/null +++ b/vendor/gipfl/protocol/src/Generic/AbstractStreamWrapper.php @@ -0,0 +1,139 @@ +readFrom($in); + if ($out === null && $in instanceof WritableStreamInterface) { + $this->writeTo($in); + } else { + $this->writeTo($out); + } + } + + abstract public function handleData($data); + + protected function readFrom(ReadableStreamInterface $input) + { + $this->input = $input; + if (! $input->isReadable()) { + $this->close(); + return; + } + $input->on('data', function ($data) { + $this->handleData($data); + }); + $input->on('end', function () { + $this->handleEnd(); + }); + $input->on('close', function () { + $this->close(); + }); + $input->on('error', function (Exception $error) { + $this->handleError($error); + }); + } + + protected function writeTo(WritableStreamInterface $output) + { + $this->output = $output; + if (! $this->output->isWritable()) { + $this->close(); + throw new RuntimeException('Cannot write to output'); + } + + $output->on('drain', function () { + $this->handleDrain(); + }); + $output->on('close', function () { + $this->close(); + }); + $output->on('error', function (Exception $error) { + $this->handleError($error); + }); + } + + protected function handleDrain() + { + $this->emit('drain'); + } + + protected function handleEnd() + { + if (! $this->closed) { + $this->emit('end'); + $this->close(); + } + } + + public function isReadable() + { + return !$this->closed && $this->input->isReadable(); + } + + public function isWritable() + { + return !$this->closed && $this->output->isWritable(); + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->closed = true; + $this->input->close(); + $this->output->close(); + + $this->emit('close'); + $this->removeAllListeners(); + } + + public function pause() + { + $this->input->pause(); + } + + public function resume() + { + $this->input->resume(); + } + + public function pipe(WritableStreamInterface $dest, array $options = []) + { + Util::pipe($this, $dest, $options); + + return $dest; + } + + /** + * @param Exception $error + */ + protected function handleError(Exception $error) + { + $this->emit('error', [$error]); + $this->close(); + } +} -- cgit v1.2.3