summaryrefslogtreecommitdiffstats
path: root/vendor/clue/connection-manager-extra/src/Multiple
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/clue/connection-manager-extra/src/Multiple')
-rw-r--r--vendor/clue/connection-manager-extra/src/Multiple/ConnectionManagerConcurrent.php36
-rw-r--r--vendor/clue/connection-manager-extra/src/Multiple/ConnectionManagerConsecutive.php62
-rw-r--r--vendor/clue/connection-manager-extra/src/Multiple/ConnectionManagerRandom.php14
-rw-r--r--vendor/clue/connection-manager-extra/src/Multiple/ConnectionManagerSelective.php111
4 files changed, 223 insertions, 0 deletions
diff --git a/vendor/clue/connection-manager-extra/src/Multiple/ConnectionManagerConcurrent.php b/vendor/clue/connection-manager-extra/src/Multiple/ConnectionManagerConcurrent.php
new file mode 100644
index 0000000..c1eb9cf
--- /dev/null
+++ b/vendor/clue/connection-manager-extra/src/Multiple/ConnectionManagerConcurrent.php
@@ -0,0 +1,36 @@
+<?php
+
+namespace ConnectionManager\Extra\Multiple;
+
+use ConnectionManager\Extra\Multiple\ConnectionManagerConsecutive;
+use React\Promise;
+use React\Promise\CancellablePromiseInterface;
+
+class ConnectionManagerConcurrent extends ConnectionManagerConsecutive
+{
+ public function connect($uri)
+ {
+ $all = array();
+ foreach ($this->managers as $connector) {
+ /* @var $connection Connector */
+ $all []= $connector->connect($uri);
+ }
+ return Promise\any($all)->then(function ($conn) use ($all) {
+ // a connection attempt succeeded
+ // => cancel all pending connection attempts
+ foreach ($all as $promise) {
+ if ($promise instanceof CancellablePromiseInterface) {
+ $promise->cancel();
+ }
+
+ // if promise resolves despite cancellation, immediately close stream
+ $promise->then(function ($stream) use ($conn) {
+ if ($stream !== $conn) {
+ $stream->close();
+ }
+ });
+ }
+ return $conn;
+ });
+ }
+}
diff --git a/vendor/clue/connection-manager-extra/src/Multiple/ConnectionManagerConsecutive.php b/vendor/clue/connection-manager-extra/src/Multiple/ConnectionManagerConsecutive.php
new file mode 100644
index 0000000..1474b85
--- /dev/null
+++ b/vendor/clue/connection-manager-extra/src/Multiple/ConnectionManagerConsecutive.php
@@ -0,0 +1,62 @@
+<?php
+
+namespace ConnectionManager\Extra\Multiple;
+
+use React\Socket\ConnectorInterface;
+use React\Promise;
+use UnderflowException;
+use React\Promise\CancellablePromiseInterface;
+
+class ConnectionManagerConsecutive implements ConnectorInterface
+{
+ protected $managers;
+
+ /**
+ *
+ * @param ConnectorInterface[] $managers
+ */
+ public function __construct(array $managers)
+ {
+ if (!$managers) {
+ throw new \InvalidArgumentException('List of connectors must not be empty');
+ }
+ $this->managers = $managers;
+ }
+
+ public function connect($uri)
+ {
+ return $this->tryConnection($this->managers, $uri);
+ }
+
+ /**
+ *
+ * @param ConnectorInterface[] $managers
+ * @param string $uri
+ * @return Promise
+ * @internal
+ */
+ public function tryConnection(array $managers, $uri)
+ {
+ return new Promise\Promise(function ($resolve, $reject) use (&$managers, &$pending, $uri) {
+ $try = function () use (&$try, &$managers, $uri, $resolve, $reject, &$pending) {
+ if (!$managers) {
+ return $reject(new UnderflowException('No more managers to try to connect through'));
+ }
+
+ $manager = array_shift($managers);
+ $pending = $manager->connect($uri);
+ $pending->then($resolve, $try);
+ };
+
+ $try();
+ }, function ($_, $reject) use (&$managers, &$pending) {
+ // stop retrying, reject results and cancel pending attempt
+ $managers = array();
+ $reject(new \RuntimeException('Cancelled'));
+
+ if ($pending instanceof CancellablePromiseInterface) {
+ $pending->cancel();
+ }
+ });
+ }
+}
diff --git a/vendor/clue/connection-manager-extra/src/Multiple/ConnectionManagerRandom.php b/vendor/clue/connection-manager-extra/src/Multiple/ConnectionManagerRandom.php
new file mode 100644
index 0000000..88d1fd6
--- /dev/null
+++ b/vendor/clue/connection-manager-extra/src/Multiple/ConnectionManagerRandom.php
@@ -0,0 +1,14 @@
+<?php
+
+namespace ConnectionManager\Extra\Multiple;
+
+class ConnectionManagerRandom extends ConnectionManagerConsecutive
+{
+ public function connect($uri)
+ {
+ $managers = $this->managers;
+ shuffle($managers);
+
+ return $this->tryConnection($managers, $uri);
+ }
+}
diff --git a/vendor/clue/connection-manager-extra/src/Multiple/ConnectionManagerSelective.php b/vendor/clue/connection-manager-extra/src/Multiple/ConnectionManagerSelective.php
new file mode 100644
index 0000000..859ea90
--- /dev/null
+++ b/vendor/clue/connection-manager-extra/src/Multiple/ConnectionManagerSelective.php
@@ -0,0 +1,111 @@
+<?php
+
+namespace ConnectionManager\Extra\Multiple;
+
+use React\Socket\ConnectorInterface;
+use React\Promise;
+use UnderflowException;
+use InvalidArgumentException;
+
+class ConnectionManagerSelective implements ConnectorInterface
+{
+ private $managers;
+
+ /**
+ *
+ * @param ConnectorInterface[] $managers
+ */
+ public function __construct(array $managers)
+ {
+ foreach ($managers as $filter => $manager) {
+ $host = $filter;
+ $portMin = 0;
+ $portMax = 65535;
+
+ // search colon (either single one OR preceded by "]" due to IPv6)
+ $colon = strrpos($host, ':');
+ if ($colon !== false && (strpos($host, ':') === $colon || substr($host, $colon - 1, 1) === ']' )) {
+ if (!isset($host[$colon + 1])) {
+ throw new InvalidArgumentException('Entry "' . $filter . '" has no port after colon');
+ }
+
+ $minus = strpos($host, '-', $colon);
+ if ($minus === false) {
+ $portMin = $portMax = (int)substr($host, $colon + 1);
+
+ if (substr($host, $colon + 1) !== (string)$portMin) {
+ throw new InvalidArgumentException('Entry "' . $filter . '" has no valid port after colon');
+ }
+ } else {
+ $portMin = (int)substr($host, $colon + 1, ($minus - $colon));
+ $portMax = (int)substr($host, $minus + 1);
+
+ if (substr($host, $colon + 1) !== ($portMin . '-' . $portMax)) {
+ throw new InvalidArgumentException('Entry "' . $filter . '" has no valid port range after colon');
+ }
+
+ if ($portMin > $portMax) {
+ throw new InvalidArgumentException('Entry "' . $filter . '" has port range mixed up');
+ }
+ }
+ $host = substr($host, 0, $colon);
+ }
+
+ if ($host === '') {
+ throw new InvalidArgumentException('Entry "' . $filter . '" has an empty host');
+ }
+
+ if (!$manager instanceof ConnectorInterface) {
+ throw new InvalidArgumentException('Entry "' . $filter . '" is not a valid connector');
+ }
+ }
+
+ $this->managers = $managers;
+ }
+
+ public function connect($uri)
+ {
+ $parts = parse_url((strpos($uri, '://') === false ? 'tcp://' : '') . $uri);
+ if (!isset($parts) || !isset($parts['scheme'], $parts['host'], $parts['port'])) {
+ return Promise\reject(new InvalidArgumentException('Invalid URI'));
+ }
+
+ $connector = $this->getConnectorForTarget(
+ trim($parts['host'], '[]'),
+ $parts['port']
+ );
+
+ if ($connector === null) {
+ return Promise\reject(new UnderflowException('No connector for given target found'));
+ }
+
+ return $connector->connect($uri);
+ }
+
+ private function getConnectorForTarget($targetHost, $targetPort)
+ {
+ foreach ($this->managers as $host => $connector) {
+ $portMin = 0;
+ $portMax = 65535;
+
+ // search colon (either single one OR preceded by "]" due to IPv6)
+ $colon = strrpos($host, ':');
+ if ($colon !== false && (strpos($host, ':') === $colon || substr($host, $colon - 1, 1) === ']' )) {
+ $minus = strpos($host, '-', $colon);
+ if ($minus === false) {
+ $portMin = $portMax = (int)substr($host, $colon + 1);
+ } else {
+ $portMin = (int)substr($host, $colon + 1, ($minus - $colon));
+ $portMax = (int)substr($host, $minus + 1);
+ }
+ $host = trim(substr($host, 0, $colon), '[]');
+ }
+
+ if ($targetPort >= $portMin && $targetPort <= $portMax && fnmatch($host, $targetHost)) {
+ return $connector;
+ }
+ }
+
+ return null;
+ }
+}