summaryrefslogtreecommitdiffstats
path: root/vendor/gipfl/influxdb
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 12:44:51 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 12:44:51 +0000
commita1ec78bf0dc93d0e05e5f066f1949dc3baecea06 (patch)
treeee596ce1bc9840661386f96f9b8d1f919a106317 /vendor/gipfl/influxdb
parentInitial commit. (diff)
downloadicingaweb2-module-incubator-a1ec78bf0dc93d0e05e5f066f1949dc3baecea06.tar.xz
icingaweb2-module-incubator-a1ec78bf0dc93d0e05e5f066f1949dc3baecea06.zip
Adding upstream version 0.20.0.upstream/0.20.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/gipfl/influxdb')
-rw-r--r--vendor/gipfl/influxdb/LICENSE21
-rw-r--r--vendor/gipfl/influxdb/composer.json25
-rw-r--r--vendor/gipfl/influxdb/src/ChunkedInfluxDbWriter.php158
-rw-r--r--vendor/gipfl/influxdb/src/DataPoint.php63
-rw-r--r--vendor/gipfl/influxdb/src/Escape.php67
-rw-r--r--vendor/gipfl/influxdb/src/InfluxDbConnection.php24
-rw-r--r--vendor/gipfl/influxdb/src/InfluxDbConnectionFactory.php38
-rw-r--r--vendor/gipfl/influxdb/src/InfluxDbConnectionV1.php311
-rw-r--r--vendor/gipfl/influxdb/src/InfluxDbConnectionV2.php270
-rw-r--r--vendor/gipfl/influxdb/src/InfluxDbQueryResult.php65
-rw-r--r--vendor/gipfl/influxdb/src/LineProtocol.php63
11 files changed, 1105 insertions, 0 deletions
diff --git a/vendor/gipfl/influxdb/LICENSE b/vendor/gipfl/influxdb/LICENSE
new file mode 100644
index 0000000..dd88e09
--- /dev/null
+++ b/vendor/gipfl/influxdb/LICENSE
@@ -0,0 +1,21 @@
+The MIT License
+
+Copyright (c) 2018 Thomas Gelf
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/vendor/gipfl/influxdb/composer.json b/vendor/gipfl/influxdb/composer.json
new file mode 100644
index 0000000..8c9aec4
--- /dev/null
+++ b/vendor/gipfl/influxdb/composer.json
@@ -0,0 +1,25 @@
+{
+ "name": "gipfl/influxdb",
+ "description": "InfluxDB client library",
+ "type": "library",
+ "license": "MIT",
+ "autoload": {
+ "psr-4": {
+ "gipfl\\InfluxDb\\": "src/"
+ }
+ },
+ "authors": [
+ {
+ "name": "Thomas Gelf",
+ "email": "thomas@gelf.net"
+ }
+ ],
+ "require": {
+ "php": ">=5.6.0",
+ "ext-ctype": "*",
+ "ext-pcntl": "*",
+ "gipfl/curl": ">=0.1.1",
+ "react/event-loop": ">=1.1",
+ "gipfl/json": ">=0.2"
+ }
+}
diff --git a/vendor/gipfl/influxdb/src/ChunkedInfluxDbWriter.php b/vendor/gipfl/influxdb/src/ChunkedInfluxDbWriter.php
new file mode 100644
index 0000000..37473a7
--- /dev/null
+++ b/vendor/gipfl/influxdb/src/ChunkedInfluxDbWriter.php
@@ -0,0 +1,158 @@
+<?php
+
+namespace gipfl\InfluxDb;
+
+use gipfl\Curl\RequestError;
+use Psr\Http\Message\ResponseInterface;
+use Psr\Log\LoggerAwareInterface;
+use Psr\Log\LoggerAwareTrait;
+use Psr\Log\NullLogger;
+use React\EventLoop\LoopInterface;
+use React\EventLoop\TimerInterface;
+
+/**
+ * Gives no result, enqueue and forget
+ */
+class ChunkedInfluxDbWriter implements LoggerAwareInterface
+{
+ use LoggerAwareTrait;
+
+ const DEFAULT_BUFFER_SIZE = 5000;
+
+ const DEFAULT_FLUSH_INTERVAL = 0.2;
+
+ const DEFAULT_PRECISION = 's';
+
+ /** @var int */
+ protected $bufferSize = self::DEFAULT_BUFFER_SIZE;
+
+ /** @var float */
+ protected $flushInterval = self::DEFAULT_FLUSH_INTERVAL;
+
+ /** @var string */
+ protected $precision = self::DEFAULT_PRECISION;
+
+ /** @var DataPoint[] */
+ protected $buffer = [];
+
+ /** @var InfluxDbConnection */
+ protected $connection;
+
+ /** @var string */
+ protected $dbName;
+
+ /** @var LoopInterface */
+ protected $loop;
+
+ /** @var ?TimerInterface */
+ protected $flushTimer;
+
+ public function __construct(InfluxDbConnection $connection, $dbName, LoopInterface $loop)
+ {
+ $this->setLogger(new NullLogger());
+ $this->connection = $connection;
+ $this->dbName = $dbName;
+ $this->loop = $loop;
+ }
+
+ /**
+ * @param DataPoint $point
+ */
+ public function enqueue(DataPoint $point)
+ {
+ $this->buffer[] = $point;
+ $count = count($this->buffer);
+ if ($count >= $this->bufferSize) {
+ $this->flush();
+ } else {
+ $this->startFlushTimer();
+ }
+ }
+
+ /**
+ * @param int $bufferSize
+ * @return ChunkedInfluxDbWriter
+ */
+ public function setBufferSize($bufferSize)
+ {
+ $this->bufferSize = $bufferSize;
+ return $this;
+ }
+
+ /**
+ * @param float $flushInterval
+ * @return ChunkedInfluxDbWriter
+ */
+ public function setFlushInterval($flushInterval)
+ {
+ $this->flushInterval = $flushInterval;
+ return $this;
+ }
+
+ /**
+ * @param string $precision ns,u,ms,s,m,h
+ * @return ChunkedInfluxDbWriter
+ */
+ public function setPrecision($precision)
+ {
+ $this->precision = $precision;
+ return $this;
+ }
+
+ public function flush()
+ {
+ $buffer = $this->buffer;
+ $this->buffer = [];
+ $this->stopFlushTimer();
+ $this->logger->debug(sprintf('Flushing InfluxDB buffer, sending %d data points', count($buffer)));
+ $start = microtime(true);
+ $this->connection->writeDataPoints($this->dbName, $buffer, $this->precision)
+ ->then(function (ResponseInterface $response) use ($start) {
+ $code = $response->getStatusCode();
+ $duration = (microtime(true) - $start) * 1000;
+ if ($code > 199 && $code < 300) {
+ $this->logger->debug(sprintf('Got response from InfluxDB after %.2Fms', $duration));
+ } else {
+ $this->logger->error(sprintf(
+ 'Got unexpected %d from InfluxDB after %.2Fms: %s',
+ $code,
+ $duration,
+ $response->getReasonPhrase()
+ ));
+ }
+ }, function (RequestError $e) {
+ $this->logger->error($e->getMessage());
+ })->done();
+ }
+
+ public function stop()
+ {
+ $this->flush();
+ }
+
+ protected function startFlushTimer()
+ {
+ if ($this->flushTimer === null) {
+ $this->flushTimer = $this->loop->addPeriodicTimer($this->flushInterval, function () {
+ if (! empty($this->buffer)) {
+ $this->flush();
+ }
+ });
+ }
+ }
+
+ protected function stopFlushTimer()
+ {
+ if ($this->flushTimer) {
+ $this->loop->cancelTimer($this->flushTimer);
+ $this->flushTimer = null;
+ }
+ }
+
+ public function __destruct()
+ {
+ $this->stopFlushTimer();
+ $this->loop = null;
+ $this->connection = null;
+ }
+}
diff --git a/vendor/gipfl/influxdb/src/DataPoint.php b/vendor/gipfl/influxdb/src/DataPoint.php
new file mode 100644
index 0000000..f272206
--- /dev/null
+++ b/vendor/gipfl/influxdb/src/DataPoint.php
@@ -0,0 +1,63 @@
+<?php
+
+namespace gipfl\InfluxDb;
+
+use InvalidArgumentException;
+use function array_key_exists;
+use function array_merge;
+use function is_array;
+use function is_object;
+use function ksort;
+
+class DataPoint
+{
+ protected $timestamp;
+
+ protected $measurement;
+
+ protected $tags = [];
+
+ protected $fields;
+
+ public function __construct($measurement, $tags = [], $fields = [], $timestamp = null)
+ {
+ $this->measurement = (string) $measurement;
+ if ($timestamp !== null) {
+ $this->timestamp = $timestamp;
+ }
+
+ if (! empty($tags)) {
+ $this->addTags($tags);
+ }
+
+ if (is_array($fields) || is_object($fields)) {
+ $this->fields = (array) $fields;
+ } else {
+ $this->fields = ['value' => $fields];
+ }
+
+ if (empty($this->fields)) {
+ throw new InvalidArgumentException('At least one field/value is required');
+ }
+ }
+
+ public function addTags($tags)
+ {
+ $this->tags = array_merge($this->tags, (array) $tags);
+ ksort($this->tags);
+ }
+
+ public function getTag($name, $default = null)
+ {
+ if (array_key_exists($name, $this->tags)) {
+ return $this->tags[$name];
+ } else {
+ return $default;
+ }
+ }
+
+ public function __toString()
+ {
+ return LineProtocol::renderMeasurement($this->measurement, $this->tags, $this->fields, $this->timestamp);
+ }
+}
diff --git a/vendor/gipfl/influxdb/src/Escape.php b/vendor/gipfl/influxdb/src/Escape.php
new file mode 100644
index 0000000..e6cb555
--- /dev/null
+++ b/vendor/gipfl/influxdb/src/Escape.php
@@ -0,0 +1,67 @@
+<?php
+
+namespace gipfl\InfluxDb;
+
+use InvalidArgumentException;
+use function addcslashes;
+use function ctype_digit;
+use function is_bool;
+use function is_int;
+use function is_null;
+use function preg_match;
+use function strpos;
+
+abstract class Escape
+{
+ const ESCAPE_COMMA_SPACE = ' ,\\';
+
+ const ESCAPE_COMMA_EQUAL_SPACE = ' =,\\';
+
+ const ESCAPE_DOUBLE_QUOTES = '"\\';
+
+ const NULL = 'null';
+
+ const TRUE = 'true';
+
+ const FALSE = 'false';
+
+ public static function measurement($value)
+ {
+ static::assertNoNewline($value);
+ return addcslashes($value, self::ESCAPE_COMMA_SPACE);
+ }
+
+ public static function key($value)
+ {
+ static::assertNoNewline($value);
+ return addcslashes($value, self::ESCAPE_COMMA_EQUAL_SPACE);
+ }
+
+ public static function tagValue($value)
+ {
+ static::assertNoNewline($value);
+ return addcslashes($value, self::ESCAPE_COMMA_EQUAL_SPACE);
+ }
+
+ public static function fieldValue($value)
+ {
+ // Faster checks first
+ if (is_int($value) || ctype_digit($value) || preg_match('/^-\d+$/', $value)) {
+ return "{$value}i";
+ } elseif (is_bool($value)) {
+ return $value ? self::TRUE : self::FALSE;
+ } elseif (is_null($value)) {
+ return self::NULL;
+ } else {
+ static::assertNoNewline($value);
+ return '"' . addcslashes($value, self::ESCAPE_DOUBLE_QUOTES) . '"';
+ }
+ }
+
+ protected static function assertNoNewline($value)
+ {
+ if (strpos($value, "\n") !== false) {
+ throw new InvalidArgumentException('Newlines are forbidden in InfluxDB line protocol');
+ }
+ }
+}
diff --git a/vendor/gipfl/influxdb/src/InfluxDbConnection.php b/vendor/gipfl/influxdb/src/InfluxDbConnection.php
new file mode 100644
index 0000000..d20944a
--- /dev/null
+++ b/vendor/gipfl/influxdb/src/InfluxDbConnection.php
@@ -0,0 +1,24 @@
+<?php
+
+namespace gipfl\InfluxDb;
+
+interface InfluxDbConnection
+{
+ public function ping($verbose = false);
+
+ public function getVersion();
+
+ public function listDatabases();
+
+ public function createDatabase($name);
+
+ public function getHealth();
+
+ /**
+ * @param string $dbName
+ * @param DataPoint[] $dataPoints
+ * @param string|null $precision ns,u,ms,s,m,h
+ * @return \React\Promise\Promise
+ */
+ public function writeDataPoints($dbName, array $dataPoints, $precision = null);
+}
diff --git a/vendor/gipfl/influxdb/src/InfluxDbConnectionFactory.php b/vendor/gipfl/influxdb/src/InfluxDbConnectionFactory.php
new file mode 100644
index 0000000..f260010
--- /dev/null
+++ b/vendor/gipfl/influxdb/src/InfluxDbConnectionFactory.php
@@ -0,0 +1,38 @@
+<?php
+
+namespace gipfl\InfluxDb;
+
+use gipfl\Curl\CurlAsync;
+use React\EventLoop\LoopInterface;
+use React\Promise\Promise;
+use RuntimeException;
+
+abstract class InfluxDbConnectionFactory
+{
+ /**
+ * AsyncInfluxDbWriter constructor.
+ * @param LoopInterface $loop
+ * @param $baseUrl string InfluxDB base URL
+ * @param string|null $username
+ * @param string|null $password
+ * @return Promise <InfluxDbConnection>
+ */
+ public static function create(CurlAsync $curl, $baseUrl, $username = null, $password = null)
+ {
+ $v1 = new InfluxDbConnectionV1($curl, $baseUrl);
+ return $v1->getVersion()->then(function ($version) use ($baseUrl, $username, $password, $curl, $v1) {
+ if ($version === null || preg_match('/^v?2\./', $version)) {
+ $v2 = new InfluxDbConnectionV2($curl, $baseUrl, $username, $password);
+ return $v2->getVersion()->then(function ($version) use ($v2) {
+ if ($version === null) {
+ throw new RuntimeException('Unable to detect InfluxDb version');
+ } else {
+ return $v2;
+ }
+ });
+ } else {
+ return $v1;
+ }
+ });
+ }
+}
diff --git a/vendor/gipfl/influxdb/src/InfluxDbConnectionV1.php b/vendor/gipfl/influxdb/src/InfluxDbConnectionV1.php
new file mode 100644
index 0000000..0b674c2
--- /dev/null
+++ b/vendor/gipfl/influxdb/src/InfluxDbConnectionV1.php
@@ -0,0 +1,311 @@
+<?php
+
+namespace gipfl\InfluxDb;
+
+use gipfl\Curl\CurlAsync;
+use gipfl\Json\JsonString;
+use Psr\Http\Message\ResponseInterface;
+use Ramsey\Uuid\Uuid;
+use React\Promise\Promise;
+use function React\Promise\resolve;
+
+class InfluxDbConnectionV1 implements InfluxDbConnection
+{
+ const API_VERSION = 'v1';
+
+ const USER_AGENT = 'gipfl-InfluxDB/0.5';
+
+ /** @var string */
+ protected $baseUrl;
+
+ protected $version;
+
+ /** @var string|null */
+ protected $username;
+
+ /** @var string|null */
+ protected $password;
+
+ protected $curl;
+
+ /**
+ * AsyncInfluxDbWriter constructor.
+ * @param CurlAsync $curl
+ * @param string $baseUrl InfluxDB base URL
+ * @param ?string $username
+ * @param ?string $password
+ */
+ public function __construct(CurlAsync $curl, $baseUrl, $username = null, $password = null)
+ {
+ $this->baseUrl = rtrim($baseUrl, '/');
+ $this->curl = $curl;
+ $this->setUsername($username);
+ $this->setPassword($password);
+ }
+
+ /**
+ * @param string|null $username
+ * @return $this
+ */
+ public function setUsername($username)
+ {
+ $this->username = $username;
+ return $this;
+ }
+
+ /**
+ * @param string|null $password
+ * @return $this
+ */
+ public function setPassword($password)
+ {
+ $this->password = $password;
+ return $this;
+ }
+
+ public function ping($verbose = false)
+ {
+ $params = [];
+ if ($verbose) {
+ $params['verbose'] = 'true';
+ }
+ return $this->getUrl('ping', $params);
+ }
+
+ public function getVersion()
+ {
+ if ($this->version) {
+ return resolve($this->version);
+ }
+
+ return $this->get('ping')->then(function (ResponseInterface $response) {
+ foreach ($response->getHeader('X-Influxdb-Version') as $version) {
+ return $this->version = $version;
+ }
+
+ return null;
+ });
+ }
+
+ public function listDatabases()
+ {
+ return $this->query('SHOW DATABASES')->then(function ($result) {
+ return InfluxDbQueryResult::extractColumn($result);
+ });
+ }
+
+ public function createDatabase($name)
+ {
+ return $this->query('CREATE DATABASE ' . Escape::fieldValue($name))->then(function ($result) {
+ return $result;
+ });
+ }
+
+ /**
+ * only since vX
+ */
+ public function getHealth()
+ {
+ // Works without Auth
+ return $this->getUrl('health');
+ }
+
+ protected function query($query)
+ {
+ if (is_array($query)) {
+ $sendQueries = \array_values($query);
+ } else {
+ $sendQueries = [$query];
+ }
+ if (empty($query)) {
+ throw new \InvalidArgumentException('Cannot run no query');
+ }
+
+ if (preg_match('/^(SELECT|SHOW|ALTER|CREATE|DELETE|DROP|GRANT|KILL|REVOKE) /', $sendQueries[0], $match)) {
+ $queryType = $match[1];
+ } else {
+ throw new \InvalidArgumentException('Unable to detect query type: ' . $sendQueries[0]);
+ }
+ if ($queryType === 'SHOW') {
+ $queryType = 'GET';
+ } elseif ($queryType === 'SELECT') {
+ if (strpos($sendQueries[0], ' INTO ') === false) {
+ $queryType = 'POST';
+ } else {
+ $queryType = 'GET';
+ }
+ } else {
+ $queryType = 'POST';
+ }
+ $prefix = '';
+
+ // TODO: Temporarily disabled, had problems with POST params in the body
+ if ($queryType === 'xPOST') {
+ $headers = ['Content-Type' => 'x-www-form-urlencoded'];
+ $body = \http_build_query(['q' => implode(';', $sendQueries)]);
+ $urlParams = [];
+ $promise = $this->curl->post(
+ $this->url("{$prefix}query", $urlParams),
+ $this->getRequestHeaders() + $headers,
+ $body
+ );
+ } else {
+ $urlParams = ['q' => implode(';', $sendQueries)];
+ $promise = $this->curl->get(
+ $this->url("{$prefix}query", $urlParams),
+ $this->getRequestHeaders()
+ );
+ }
+
+ /** @var Promise $promise */
+ return $promise->then(function (ResponseInterface $response) use ($sendQueries, $query) {
+ $body = $response->getBody();
+ if (! ($response->getStatusCode() < 300)) {
+ throw new \Exception($response->getReasonPhrase());
+ }
+ if (preg_match('#^application/json#', \current($response->getHeader('content-type')))) {
+ $decoded = JsonString::decode((string) $body);
+ } else {
+ throw new \RuntimeException(\sprintf(
+ 'JSON response expected, got %s: %s',
+ current($response->getHeader('content-type')),
+ $body
+ ));
+ }
+ $results = [];
+ foreach ($decoded->results as $result) {
+ if (isset($result->series)) {
+ $results[$result->statement_id] = $result->series[0];
+ } elseif (isset($result->error)) {
+ $results[$result->statement_id] = new \Exception('InfluxDB error: ' . $result->error);
+ } else {
+ $results[$result->statement_id] = null;
+ }
+ }
+ if (\count($results) !== \count($sendQueries)) {
+ throw new \InvalidArgumentException(\sprintf(
+ 'Sent %d statements, but got %d results',
+ \count($sendQueries),
+ \count($results)
+ ));
+ }
+
+ if (is_array($query)) {
+ return \array_combine(\array_keys($query), $results);
+ } else {
+ return $results[0];
+ }
+ });
+ }
+
+ /**
+ * @param string $dbName
+ * @param DataPoint[] $dataPoints
+ * @param string|null $precision ns,u,ms,s,m,h
+ * @return \React\Promise\Promise
+ */
+ public function writeDataPoints($dbName, array $dataPoints, $precision = null)
+ {
+ $body = gzencode(\implode($dataPoints), 6);
+ $params = ['db' => $dbName];
+ if ($precision !== null) {
+ $params['precision'] = $precision;
+ }
+ $headers = [
+ 'X-Request-Id' => Uuid::uuid4()->toString(),
+ 'Content-Encoding' => 'gzip',
+ 'Content-Length' => strlen($body),
+ ];
+ // params['rp'] = $retentionPolicy
+ /** @var Promise $promise */
+ return $this->curl->post(
+ $this->url('write', $params),
+ $this->getRequestHeaders() + $headers,
+ $body,
+ $this->getDefaultCurlOptions()
+ );
+ }
+
+ protected function getDefaultCurlOptions()
+ {
+ return [
+ // Hint: avoid 100/Continue
+ CURLOPT_HTTPHEADER => [
+ 'Expect:',
+ ]
+ ];
+ }
+
+ protected function getRequestHeaders()
+ {
+ $headers = [
+ 'User-Agent' => static::USER_AGENT,
+ ];
+ if ($this->username !== null) {
+ $headers['Authorization'] = 'Basic '
+ . \base64_encode($this->username . ':' . $this->password);
+ }
+
+ return $headers;
+ }
+
+ protected function get($url, $params = null)
+ {
+ return $this->curl->get(
+ $this->url($url, $params),
+ $this->getRequestHeaders()
+ );
+ }
+
+ protected function getRaw($url, $params = null)
+ {
+ /** @var Promise $promise */
+ $promise = $this
+ ->get($url, $params)
+ ->then(function (ResponseInterface $response) {
+ return (string) $response->getBody();
+ });
+
+ return $promise;
+ }
+
+ protected function postRaw($url, $body, $headers = [], $urlParams = [])
+ {
+ /** @var Promise $promise */
+ $promise = $this->curl->post(
+ $this->url($url, $urlParams),
+ $this->getRequestHeaders() + $headers + [
+ 'Content-Type' => 'application/json'
+ ],
+ $body
+ )->then(function (ResponseInterface $response) {
+ return (string) $response->getBody();
+ });
+
+ return $promise;
+ }
+
+ protected function getUrl($url, $params = null)
+ {
+ return $this->getRaw($url, $params)->then(function ($raw) {
+ return JsonString::decode((string) $raw);
+ });
+ }
+
+ protected function postUrl($url, $body, $headers = [], $urlParams = [])
+ {
+ return $this->postRaw($url, JsonString::encode($body), $headers, $urlParams)->then(function ($raw) {
+ return JsonString::decode((string) $raw);
+ });
+ }
+
+ protected function url($path, $params = [])
+ {
+ $url = $this->baseUrl . "/$path";
+ if (! empty($params)) {
+ $url .= '?' . \http_build_query($params);
+ }
+
+ return $url;
+ }
+}
diff --git a/vendor/gipfl/influxdb/src/InfluxDbConnectionV2.php b/vendor/gipfl/influxdb/src/InfluxDbConnectionV2.php
new file mode 100644
index 0000000..d244786
--- /dev/null
+++ b/vendor/gipfl/influxdb/src/InfluxDbConnectionV2.php
@@ -0,0 +1,270 @@
+<?php
+
+namespace gipfl\InfluxDb;
+
+use gipfl\Curl\CurlAsync;
+use gipfl\Json\JsonString;
+use Psr\Http\Message\ResponseInterface;
+use Ramsey\Uuid\Uuid;
+use React\EventLoop\LoopInterface;
+use React\Promise\Promise;
+
+class InfluxDbConnectionV2 implements InfluxDbConnection
+{
+ const API_VERSION = 'v2';
+
+ const USER_AGENT = 'gipfl-InfluxDB/0.5';
+
+ /** @var CurlAsync */
+ protected $curl;
+
+ /** @var string */
+ protected $baseUrl;
+
+ /** @var string|null */
+ protected $token;
+
+ /** @var string|null */
+ protected $org;
+
+ /**
+ * AsyncInfluxDbWriter constructor.
+ * @param $baseUrl string InfluxDB base URL
+ * @param LoopInterface $loop
+ */
+ public function __construct(CurlAsync $curl, $baseUrl, $org, $token)
+ {
+ $this->baseUrl = $baseUrl;
+ $this->setOrg($org);
+ $this->setToken($token);
+ $this->curl = $curl;
+ }
+
+ /**
+ * @param string|null $token
+ * @return $this
+ */
+ public function setToken($token)
+ {
+ $this->token = $token;
+
+ return $this;
+ }
+
+ /**
+ * @param string|null $org
+ * @return $this
+ */
+ public function setOrg($org)
+ {
+ $this->org = $org;
+
+ return $this;
+ }
+
+ public function ping($verbose = false)
+ {
+ $params = [];
+ if ($verbose) {
+ $params['verbose'] = 'true';
+ }
+ return $this->getUrl('ping', $params);
+ }
+
+ public function getVersion()
+ {
+ return $this->getHealth()->then(function ($result) {
+ return $result->version;
+ });
+ }
+
+ public function getMyOrgId()
+ {
+ return $this->getUrl('api/v2/orgs', ['org' => urlencode($this->org)])->then(function ($result) {
+ foreach ($result->orgs as $org) {
+ if ($org->name === $this->org) {
+ return $org->id;
+ }
+ }
+
+ throw new \RuntimeException('Org "' . $this->org . '" not found');
+ });
+ }
+
+ public function listDatabases()
+ {
+ // ->links->self = "/api/v2/buckets?descending=false\u0026limit=2\u0026offset=0"
+ // ->links->next = "next": "/api/v2/buckets?descending=false\u0026limit=2\u0026offset=2"
+ // 100 -> maxlimit
+ return $this->getUrl('api/v2/buckets', ['limit' => 100])->then(function ($result) {
+ $list = [];
+ foreach ($result->buckets as $bucket) {
+ $list[] = $bucket->name;
+ }
+
+ return $list;
+ });
+ }
+
+ public function createDatabase($name)
+ {
+ return $this->getMyOrgId()->then(function ($orgId) use ($name) {
+ return $this->postUrl('api/v2/buckets', [
+ 'orgID' => $orgId,
+ 'name' => $name,
+ 'retentionRules' => [(object) [
+ 'type' => 'expire',
+ 'everySeconds' => 86400 * 7,
+ ]]
+ ]);
+ })->then(function ($result) {
+ return $result;
+ });
+ }
+
+ public function getHealth()
+ {
+ // Works without Auth
+ return $this->getUrl('health');
+ }
+
+ /**
+ * TODO: unfinished
+ * @param $query
+ * @throws \gipfl\Json\JsonEncodeException
+ */
+ protected function query($query)
+ {
+ $prefix = "api/v2/";
+ $headers = ['Content-Type' => 'application/json'];
+ $body = JsonString::encode(['query' => $query]);
+ $urlParams = ['org' => $this->org];
+ }
+
+ /**
+ * @param string $dbName
+ * @param DataPoint[] $dataPoints
+ * @param string|null $precision ns,u,ms,s,m,h
+ * @return \React\Promise\Promise
+ */
+ public function writeDataPoints($dbName, array $dataPoints, $precision = null)
+ {
+ $body = gzencode(\implode($dataPoints), 6);
+ $params = [
+ 'org' => $this->org,
+ 'bucket' => $dbName,
+ // TODO: Figure out, whether 2.0.0 also supports bucket. If so, drop db
+ 'db' => $dbName,
+ ];
+ $headers = [
+ 'X-Request-Id' => Uuid::uuid4()->toString(),
+ 'Content-Encoding' => 'gzip',
+ 'Content-Length' => strlen($body),
+ ];
+ if ($precision !== null) {
+ $params['precision'] = $precision;
+ }
+ // params['rp'] = $retentionPolicy
+ return $this->curl->post(
+ $this->url('api/v2/write', $params),
+ $this->defaultHeaders() + $headers,
+ $body,
+ $this->getDefaultCurlOptions()
+ );
+ }
+
+ protected function getDefaultCurlOptions()
+ {
+ return [
+ // Hint: avoid 100/Continue
+ CURLOPT_HTTPHEADER => [
+ 'Expect:',
+ ]
+ ];
+ }
+
+ protected function defaultHeaders()
+ {
+ $headers = [
+ 'User-Agent' => static::USER_AGENT,
+ ];
+ if ($this->token) {
+ $headers['Authorization'] = 'Token ' . $this->token;
+ }
+
+ return $headers;
+ }
+
+ protected function get($url, $params = null)
+ {
+ return $this->curl->get(
+ $this->url($url, $params),
+ $this->defaultHeaders()
+ );
+ }
+
+ protected function getRaw($url, $params = null)
+ {
+ /** @var Promise $promise */
+ $promise = $this
+ ->get($url, $params)
+ ->then(function (ResponseInterface $response) {
+ if ($response->getStatusCode() < 300) {
+ return (string) $response->getBody();
+ } else {
+ try {
+ $body = JsonString::decode($response->getBody());
+ } catch (\Exception $e) {
+ throw new \Exception($response->getReasonPhrase());
+ }
+ if (isset($body->message)) {
+ throw new \Exception($body->message);
+ } else {
+ throw new \Exception($response->getReasonPhrase());
+ }
+ }
+ });
+
+ return $promise;
+ }
+
+ protected function postRaw($url, $body, $headers = [], $urlParams = [])
+ {
+ /** @var Promise $promise */
+ $promise = $this->curl->post(
+ $this->url($url, $urlParams),
+ $this->defaultHeaders() + $headers + [
+ 'Content-Type' => 'application/json'
+ ],
+ $body
+ )->then(function (ResponseInterface $response) {
+ return (string) $response->getBody();
+ });
+
+ return $promise;
+ }
+
+ protected function getUrl($url, $params = null)
+ {
+ return $this->getRaw($url, $params)->then(function ($raw) {
+ return JsonString::decode((string) $raw);
+ });
+ }
+
+ protected function postUrl($url, $body, $headers = [], $urlParams = [])
+ {
+ return $this->postRaw($url, JsonString::encode($body), $headers, $urlParams)->then(function ($raw) {
+ return JsonString::decode((string) $raw);
+ });
+ }
+
+ protected function url($path, $params = [])
+ {
+ $url = $this->baseUrl . "/$path";
+ if (! empty($params)) {
+ $url .= '?' . \http_build_query($params);
+ }
+
+ return $url;
+ }
+}
diff --git a/vendor/gipfl/influxdb/src/InfluxDbQueryResult.php b/vendor/gipfl/influxdb/src/InfluxDbQueryResult.php
new file mode 100644
index 0000000..0ca6fd1
--- /dev/null
+++ b/vendor/gipfl/influxdb/src/InfluxDbQueryResult.php
@@ -0,0 +1,65 @@
+<?php
+
+namespace gipfl\InfluxDb;
+
+use InvalidArgumentException;
+
+class InfluxDbQueryResult
+{
+ public static function extractColumn($result, $idx = 0)
+ {
+ if (! isset($result->columns)) {
+ print_r($result);
+ exit;
+ }
+ $idx = static::getNumericColumn($idx, $result->columns);
+ $column = [];
+ foreach ($result->values as $row) {
+ $column[] = $row[$idx];
+ }
+
+ return $column;
+ }
+
+ protected static function getNumericColumn($name, $cols)
+ {
+ if (\is_int($name)) {
+ if (isset($cols[$name])) {
+ return $name;
+ }
+ }
+ if (\is_string($name)) {
+ foreach ($cols as $idx => $alias) {
+ if ($name === $alias) {
+ return $idx;
+ }
+ }
+ }
+
+ throw new InvalidArgumentException("There is no '$name' column in the result");
+ }
+
+ protected static function extractPairs($result, $keyColumn = 0, $valueColumn = 1)
+ {
+ $keyColumn = static::getNumericColumn($keyColumn, $result->columns);
+ $valueColumn = static::getNumericColumn($valueColumn, $result->columns);
+ $pairs = [];
+ foreach ($result->values as $row) {
+ $pairs[$row[$keyColumn]] = $row[$valueColumn];
+ }
+
+ return $pairs;
+ }
+
+ protected static function transformResultsTable($table)
+ {
+ // $table->name = 'databases'
+ $cols = $table->columns;
+ $values = [];
+ foreach ($table->values as $row) {
+ $values[] = (object) \array_combine($cols, $row);
+ }
+
+ return $values;
+ }
+}
diff --git a/vendor/gipfl/influxdb/src/LineProtocol.php b/vendor/gipfl/influxdb/src/LineProtocol.php
new file mode 100644
index 0000000..b3b5f4a
--- /dev/null
+++ b/vendor/gipfl/influxdb/src/LineProtocol.php
@@ -0,0 +1,63 @@
+<?php
+
+namespace gipfl\InfluxDb;
+
+use function ksort;
+use function strlen;
+
+abstract class LineProtocol
+{
+ public static function renderMeasurement($measurement, $tags = [], $fields = [], $timestamp = null)
+ {
+ return Escape::measurement($measurement)
+ . static::renderTags($tags)
+ . static::renderFields($fields)
+ . static::renderTimeStamp($timestamp)
+ . "\n";
+ }
+
+ public static function renderTags($tags)
+ {
+ ksort($tags);
+ $string = '';
+ foreach ($tags as $key => $value) {
+ if ($value === null || strlen($value) === 0) {
+ continue;
+ }
+ $string .= ',' . static::renderTag($key, $value);
+ }
+
+ return $string;
+ }
+
+ public static function renderFields($fields)
+ {
+ $string = '';
+ foreach ($fields as $key => $value) {
+ $string .= ',' . static::renderField($key, $value);
+ }
+ $string[0] = ' ';
+
+ return $string;
+ }
+
+ public static function renderTimeStamp($timestamp)
+ {
+ if ($timestamp === null) {
+ return '';
+ } else {
+ return " $timestamp";
+ }
+ }
+
+ public static function renderTag($key, $value)
+ {
+ return Escape::key($key) . '=' . Escape::tagValue($value);
+ }
+
+ public static function renderField($key, $value)
+ {
+
+ return Escape::key($key) . '=' . Escape::fieldValue($value);
+ }
+}