diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 12:44:51 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 12:44:51 +0000 |
commit | a1ec78bf0dc93d0e05e5f066f1949dc3baecea06 (patch) | |
tree | ee596ce1bc9840661386f96f9b8d1f919a106317 /vendor/gipfl/influxdb/src | |
parent | Initial commit. (diff) | |
download | icingaweb2-module-incubator-a1ec78bf0dc93d0e05e5f066f1949dc3baecea06.tar.xz icingaweb2-module-incubator-a1ec78bf0dc93d0e05e5f066f1949dc3baecea06.zip |
Adding upstream version 0.20.0.upstream/0.20.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/gipfl/influxdb/src')
-rw-r--r-- | vendor/gipfl/influxdb/src/ChunkedInfluxDbWriter.php | 158 | ||||
-rw-r--r-- | vendor/gipfl/influxdb/src/DataPoint.php | 63 | ||||
-rw-r--r-- | vendor/gipfl/influxdb/src/Escape.php | 67 | ||||
-rw-r--r-- | vendor/gipfl/influxdb/src/InfluxDbConnection.php | 24 | ||||
-rw-r--r-- | vendor/gipfl/influxdb/src/InfluxDbConnectionFactory.php | 38 | ||||
-rw-r--r-- | vendor/gipfl/influxdb/src/InfluxDbConnectionV1.php | 311 | ||||
-rw-r--r-- | vendor/gipfl/influxdb/src/InfluxDbConnectionV2.php | 270 | ||||
-rw-r--r-- | vendor/gipfl/influxdb/src/InfluxDbQueryResult.php | 65 | ||||
-rw-r--r-- | vendor/gipfl/influxdb/src/LineProtocol.php | 63 |
9 files changed, 1059 insertions, 0 deletions
diff --git a/vendor/gipfl/influxdb/src/ChunkedInfluxDbWriter.php b/vendor/gipfl/influxdb/src/ChunkedInfluxDbWriter.php new file mode 100644 index 0000000..37473a7 --- /dev/null +++ b/vendor/gipfl/influxdb/src/ChunkedInfluxDbWriter.php @@ -0,0 +1,158 @@ +<?php + +namespace gipfl\InfluxDb; + +use gipfl\Curl\RequestError; +use Psr\Http\Message\ResponseInterface; +use Psr\Log\LoggerAwareInterface; +use Psr\Log\LoggerAwareTrait; +use Psr\Log\NullLogger; +use React\EventLoop\LoopInterface; +use React\EventLoop\TimerInterface; + +/** + * Gives no result, enqueue and forget + */ +class ChunkedInfluxDbWriter implements LoggerAwareInterface +{ + use LoggerAwareTrait; + + const DEFAULT_BUFFER_SIZE = 5000; + + const DEFAULT_FLUSH_INTERVAL = 0.2; + + const DEFAULT_PRECISION = 's'; + + /** @var int */ + protected $bufferSize = self::DEFAULT_BUFFER_SIZE; + + /** @var float */ + protected $flushInterval = self::DEFAULT_FLUSH_INTERVAL; + + /** @var string */ + protected $precision = self::DEFAULT_PRECISION; + + /** @var DataPoint[] */ + protected $buffer = []; + + /** @var InfluxDbConnection */ + protected $connection; + + /** @var string */ + protected $dbName; + + /** @var LoopInterface */ + protected $loop; + + /** @var ?TimerInterface */ + protected $flushTimer; + + public function __construct(InfluxDbConnection $connection, $dbName, LoopInterface $loop) + { + $this->setLogger(new NullLogger()); + $this->connection = $connection; + $this->dbName = $dbName; + $this->loop = $loop; + } + + /** + * @param DataPoint $point + */ + public function enqueue(DataPoint $point) + { + $this->buffer[] = $point; + $count = count($this->buffer); + if ($count >= $this->bufferSize) { + $this->flush(); + } else { + $this->startFlushTimer(); + } + } + + /** + * @param int $bufferSize + * @return ChunkedInfluxDbWriter + */ + public function setBufferSize($bufferSize) + { + $this->bufferSize = $bufferSize; + return $this; + } + + /** + * @param float $flushInterval + * @return ChunkedInfluxDbWriter + */ + public function setFlushInterval($flushInterval) + { + $this->flushInterval = $flushInterval; + return $this; + } + + /** + * @param string $precision ns,u,ms,s,m,h + * @return ChunkedInfluxDbWriter + */ + public function setPrecision($precision) + { + $this->precision = $precision; + return $this; + } + + public function flush() + { + $buffer = $this->buffer; + $this->buffer = []; + $this->stopFlushTimer(); + $this->logger->debug(sprintf('Flushing InfluxDB buffer, sending %d data points', count($buffer))); + $start = microtime(true); + $this->connection->writeDataPoints($this->dbName, $buffer, $this->precision) + ->then(function (ResponseInterface $response) use ($start) { + $code = $response->getStatusCode(); + $duration = (microtime(true) - $start) * 1000; + if ($code > 199 && $code < 300) { + $this->logger->debug(sprintf('Got response from InfluxDB after %.2Fms', $duration)); + } else { + $this->logger->error(sprintf( + 'Got unexpected %d from InfluxDB after %.2Fms: %s', + $code, + $duration, + $response->getReasonPhrase() + )); + } + }, function (RequestError $e) { + $this->logger->error($e->getMessage()); + })->done(); + } + + public function stop() + { + $this->flush(); + } + + protected function startFlushTimer() + { + if ($this->flushTimer === null) { + $this->flushTimer = $this->loop->addPeriodicTimer($this->flushInterval, function () { + if (! empty($this->buffer)) { + $this->flush(); + } + }); + } + } + + protected function stopFlushTimer() + { + if ($this->flushTimer) { + $this->loop->cancelTimer($this->flushTimer); + $this->flushTimer = null; + } + } + + public function __destruct() + { + $this->stopFlushTimer(); + $this->loop = null; + $this->connection = null; + } +} diff --git a/vendor/gipfl/influxdb/src/DataPoint.php b/vendor/gipfl/influxdb/src/DataPoint.php new file mode 100644 index 0000000..f272206 --- /dev/null +++ b/vendor/gipfl/influxdb/src/DataPoint.php @@ -0,0 +1,63 @@ +<?php + +namespace gipfl\InfluxDb; + +use InvalidArgumentException; +use function array_key_exists; +use function array_merge; +use function is_array; +use function is_object; +use function ksort; + +class DataPoint +{ + protected $timestamp; + + protected $measurement; + + protected $tags = []; + + protected $fields; + + public function __construct($measurement, $tags = [], $fields = [], $timestamp = null) + { + $this->measurement = (string) $measurement; + if ($timestamp !== null) { + $this->timestamp = $timestamp; + } + + if (! empty($tags)) { + $this->addTags($tags); + } + + if (is_array($fields) || is_object($fields)) { + $this->fields = (array) $fields; + } else { + $this->fields = ['value' => $fields]; + } + + if (empty($this->fields)) { + throw new InvalidArgumentException('At least one field/value is required'); + } + } + + public function addTags($tags) + { + $this->tags = array_merge($this->tags, (array) $tags); + ksort($this->tags); + } + + public function getTag($name, $default = null) + { + if (array_key_exists($name, $this->tags)) { + return $this->tags[$name]; + } else { + return $default; + } + } + + public function __toString() + { + return LineProtocol::renderMeasurement($this->measurement, $this->tags, $this->fields, $this->timestamp); + } +} diff --git a/vendor/gipfl/influxdb/src/Escape.php b/vendor/gipfl/influxdb/src/Escape.php new file mode 100644 index 0000000..e6cb555 --- /dev/null +++ b/vendor/gipfl/influxdb/src/Escape.php @@ -0,0 +1,67 @@ +<?php + +namespace gipfl\InfluxDb; + +use InvalidArgumentException; +use function addcslashes; +use function ctype_digit; +use function is_bool; +use function is_int; +use function is_null; +use function preg_match; +use function strpos; + +abstract class Escape +{ + const ESCAPE_COMMA_SPACE = ' ,\\'; + + const ESCAPE_COMMA_EQUAL_SPACE = ' =,\\'; + + const ESCAPE_DOUBLE_QUOTES = '"\\'; + + const NULL = 'null'; + + const TRUE = 'true'; + + const FALSE = 'false'; + + public static function measurement($value) + { + static::assertNoNewline($value); + return addcslashes($value, self::ESCAPE_COMMA_SPACE); + } + + public static function key($value) + { + static::assertNoNewline($value); + return addcslashes($value, self::ESCAPE_COMMA_EQUAL_SPACE); + } + + public static function tagValue($value) + { + static::assertNoNewline($value); + return addcslashes($value, self::ESCAPE_COMMA_EQUAL_SPACE); + } + + public static function fieldValue($value) + { + // Faster checks first + if (is_int($value) || ctype_digit($value) || preg_match('/^-\d+$/', $value)) { + return "{$value}i"; + } elseif (is_bool($value)) { + return $value ? self::TRUE : self::FALSE; + } elseif (is_null($value)) { + return self::NULL; + } else { + static::assertNoNewline($value); + return '"' . addcslashes($value, self::ESCAPE_DOUBLE_QUOTES) . '"'; + } + } + + protected static function assertNoNewline($value) + { + if (strpos($value, "\n") !== false) { + throw new InvalidArgumentException('Newlines are forbidden in InfluxDB line protocol'); + } + } +} diff --git a/vendor/gipfl/influxdb/src/InfluxDbConnection.php b/vendor/gipfl/influxdb/src/InfluxDbConnection.php new file mode 100644 index 0000000..d20944a --- /dev/null +++ b/vendor/gipfl/influxdb/src/InfluxDbConnection.php @@ -0,0 +1,24 @@ +<?php + +namespace gipfl\InfluxDb; + +interface InfluxDbConnection +{ + public function ping($verbose = false); + + public function getVersion(); + + public function listDatabases(); + + public function createDatabase($name); + + public function getHealth(); + + /** + * @param string $dbName + * @param DataPoint[] $dataPoints + * @param string|null $precision ns,u,ms,s,m,h + * @return \React\Promise\Promise + */ + public function writeDataPoints($dbName, array $dataPoints, $precision = null); +} diff --git a/vendor/gipfl/influxdb/src/InfluxDbConnectionFactory.php b/vendor/gipfl/influxdb/src/InfluxDbConnectionFactory.php new file mode 100644 index 0000000..f260010 --- /dev/null +++ b/vendor/gipfl/influxdb/src/InfluxDbConnectionFactory.php @@ -0,0 +1,38 @@ +<?php + +namespace gipfl\InfluxDb; + +use gipfl\Curl\CurlAsync; +use React\EventLoop\LoopInterface; +use React\Promise\Promise; +use RuntimeException; + +abstract class InfluxDbConnectionFactory +{ + /** + * AsyncInfluxDbWriter constructor. + * @param LoopInterface $loop + * @param $baseUrl string InfluxDB base URL + * @param string|null $username + * @param string|null $password + * @return Promise <InfluxDbConnection> + */ + public static function create(CurlAsync $curl, $baseUrl, $username = null, $password = null) + { + $v1 = new InfluxDbConnectionV1($curl, $baseUrl); + return $v1->getVersion()->then(function ($version) use ($baseUrl, $username, $password, $curl, $v1) { + if ($version === null || preg_match('/^v?2\./', $version)) { + $v2 = new InfluxDbConnectionV2($curl, $baseUrl, $username, $password); + return $v2->getVersion()->then(function ($version) use ($v2) { + if ($version === null) { + throw new RuntimeException('Unable to detect InfluxDb version'); + } else { + return $v2; + } + }); + } else { + return $v1; + } + }); + } +} diff --git a/vendor/gipfl/influxdb/src/InfluxDbConnectionV1.php b/vendor/gipfl/influxdb/src/InfluxDbConnectionV1.php new file mode 100644 index 0000000..0b674c2 --- /dev/null +++ b/vendor/gipfl/influxdb/src/InfluxDbConnectionV1.php @@ -0,0 +1,311 @@ +<?php + +namespace gipfl\InfluxDb; + +use gipfl\Curl\CurlAsync; +use gipfl\Json\JsonString; +use Psr\Http\Message\ResponseInterface; +use Ramsey\Uuid\Uuid; +use React\Promise\Promise; +use function React\Promise\resolve; + +class InfluxDbConnectionV1 implements InfluxDbConnection +{ + const API_VERSION = 'v1'; + + const USER_AGENT = 'gipfl-InfluxDB/0.5'; + + /** @var string */ + protected $baseUrl; + + protected $version; + + /** @var string|null */ + protected $username; + + /** @var string|null */ + protected $password; + + protected $curl; + + /** + * AsyncInfluxDbWriter constructor. + * @param CurlAsync $curl + * @param string $baseUrl InfluxDB base URL + * @param ?string $username + * @param ?string $password + */ + public function __construct(CurlAsync $curl, $baseUrl, $username = null, $password = null) + { + $this->baseUrl = rtrim($baseUrl, '/'); + $this->curl = $curl; + $this->setUsername($username); + $this->setPassword($password); + } + + /** + * @param string|null $username + * @return $this + */ + public function setUsername($username) + { + $this->username = $username; + return $this; + } + + /** + * @param string|null $password + * @return $this + */ + public function setPassword($password) + { + $this->password = $password; + return $this; + } + + public function ping($verbose = false) + { + $params = []; + if ($verbose) { + $params['verbose'] = 'true'; + } + return $this->getUrl('ping', $params); + } + + public function getVersion() + { + if ($this->version) { + return resolve($this->version); + } + + return $this->get('ping')->then(function (ResponseInterface $response) { + foreach ($response->getHeader('X-Influxdb-Version') as $version) { + return $this->version = $version; + } + + return null; + }); + } + + public function listDatabases() + { + return $this->query('SHOW DATABASES')->then(function ($result) { + return InfluxDbQueryResult::extractColumn($result); + }); + } + + public function createDatabase($name) + { + return $this->query('CREATE DATABASE ' . Escape::fieldValue($name))->then(function ($result) { + return $result; + }); + } + + /** + * only since vX + */ + public function getHealth() + { + // Works without Auth + return $this->getUrl('health'); + } + + protected function query($query) + { + if (is_array($query)) { + $sendQueries = \array_values($query); + } else { + $sendQueries = [$query]; + } + if (empty($query)) { + throw new \InvalidArgumentException('Cannot run no query'); + } + + if (preg_match('/^(SELECT|SHOW|ALTER|CREATE|DELETE|DROP|GRANT|KILL|REVOKE) /', $sendQueries[0], $match)) { + $queryType = $match[1]; + } else { + throw new \InvalidArgumentException('Unable to detect query type: ' . $sendQueries[0]); + } + if ($queryType === 'SHOW') { + $queryType = 'GET'; + } elseif ($queryType === 'SELECT') { + if (strpos($sendQueries[0], ' INTO ') === false) { + $queryType = 'POST'; + } else { + $queryType = 'GET'; + } + } else { + $queryType = 'POST'; + } + $prefix = ''; + + // TODO: Temporarily disabled, had problems with POST params in the body + if ($queryType === 'xPOST') { + $headers = ['Content-Type' => 'x-www-form-urlencoded']; + $body = \http_build_query(['q' => implode(';', $sendQueries)]); + $urlParams = []; + $promise = $this->curl->post( + $this->url("{$prefix}query", $urlParams), + $this->getRequestHeaders() + $headers, + $body + ); + } else { + $urlParams = ['q' => implode(';', $sendQueries)]; + $promise = $this->curl->get( + $this->url("{$prefix}query", $urlParams), + $this->getRequestHeaders() + ); + } + + /** @var Promise $promise */ + return $promise->then(function (ResponseInterface $response) use ($sendQueries, $query) { + $body = $response->getBody(); + if (! ($response->getStatusCode() < 300)) { + throw new \Exception($response->getReasonPhrase()); + } + if (preg_match('#^application/json#', \current($response->getHeader('content-type')))) { + $decoded = JsonString::decode((string) $body); + } else { + throw new \RuntimeException(\sprintf( + 'JSON response expected, got %s: %s', + current($response->getHeader('content-type')), + $body + )); + } + $results = []; + foreach ($decoded->results as $result) { + if (isset($result->series)) { + $results[$result->statement_id] = $result->series[0]; + } elseif (isset($result->error)) { + $results[$result->statement_id] = new \Exception('InfluxDB error: ' . $result->error); + } else { + $results[$result->statement_id] = null; + } + } + if (\count($results) !== \count($sendQueries)) { + throw new \InvalidArgumentException(\sprintf( + 'Sent %d statements, but got %d results', + \count($sendQueries), + \count($results) + )); + } + + if (is_array($query)) { + return \array_combine(\array_keys($query), $results); + } else { + return $results[0]; + } + }); + } + + /** + * @param string $dbName + * @param DataPoint[] $dataPoints + * @param string|null $precision ns,u,ms,s,m,h + * @return \React\Promise\Promise + */ + public function writeDataPoints($dbName, array $dataPoints, $precision = null) + { + $body = gzencode(\implode($dataPoints), 6); + $params = ['db' => $dbName]; + if ($precision !== null) { + $params['precision'] = $precision; + } + $headers = [ + 'X-Request-Id' => Uuid::uuid4()->toString(), + 'Content-Encoding' => 'gzip', + 'Content-Length' => strlen($body), + ]; + // params['rp'] = $retentionPolicy + /** @var Promise $promise */ + return $this->curl->post( + $this->url('write', $params), + $this->getRequestHeaders() + $headers, + $body, + $this->getDefaultCurlOptions() + ); + } + + protected function getDefaultCurlOptions() + { + return [ + // Hint: avoid 100/Continue + CURLOPT_HTTPHEADER => [ + 'Expect:', + ] + ]; + } + + protected function getRequestHeaders() + { + $headers = [ + 'User-Agent' => static::USER_AGENT, + ]; + if ($this->username !== null) { + $headers['Authorization'] = 'Basic ' + . \base64_encode($this->username . ':' . $this->password); + } + + return $headers; + } + + protected function get($url, $params = null) + { + return $this->curl->get( + $this->url($url, $params), + $this->getRequestHeaders() + ); + } + + protected function getRaw($url, $params = null) + { + /** @var Promise $promise */ + $promise = $this + ->get($url, $params) + ->then(function (ResponseInterface $response) { + return (string) $response->getBody(); + }); + + return $promise; + } + + protected function postRaw($url, $body, $headers = [], $urlParams = []) + { + /** @var Promise $promise */ + $promise = $this->curl->post( + $this->url($url, $urlParams), + $this->getRequestHeaders() + $headers + [ + 'Content-Type' => 'application/json' + ], + $body + )->then(function (ResponseInterface $response) { + return (string) $response->getBody(); + }); + + return $promise; + } + + protected function getUrl($url, $params = null) + { + return $this->getRaw($url, $params)->then(function ($raw) { + return JsonString::decode((string) $raw); + }); + } + + protected function postUrl($url, $body, $headers = [], $urlParams = []) + { + return $this->postRaw($url, JsonString::encode($body), $headers, $urlParams)->then(function ($raw) { + return JsonString::decode((string) $raw); + }); + } + + protected function url($path, $params = []) + { + $url = $this->baseUrl . "/$path"; + if (! empty($params)) { + $url .= '?' . \http_build_query($params); + } + + return $url; + } +} diff --git a/vendor/gipfl/influxdb/src/InfluxDbConnectionV2.php b/vendor/gipfl/influxdb/src/InfluxDbConnectionV2.php new file mode 100644 index 0000000..d244786 --- /dev/null +++ b/vendor/gipfl/influxdb/src/InfluxDbConnectionV2.php @@ -0,0 +1,270 @@ +<?php + +namespace gipfl\InfluxDb; + +use gipfl\Curl\CurlAsync; +use gipfl\Json\JsonString; +use Psr\Http\Message\ResponseInterface; +use Ramsey\Uuid\Uuid; +use React\EventLoop\LoopInterface; +use React\Promise\Promise; + +class InfluxDbConnectionV2 implements InfluxDbConnection +{ + const API_VERSION = 'v2'; + + const USER_AGENT = 'gipfl-InfluxDB/0.5'; + + /** @var CurlAsync */ + protected $curl; + + /** @var string */ + protected $baseUrl; + + /** @var string|null */ + protected $token; + + /** @var string|null */ + protected $org; + + /** + * AsyncInfluxDbWriter constructor. + * @param $baseUrl string InfluxDB base URL + * @param LoopInterface $loop + */ + public function __construct(CurlAsync $curl, $baseUrl, $org, $token) + { + $this->baseUrl = $baseUrl; + $this->setOrg($org); + $this->setToken($token); + $this->curl = $curl; + } + + /** + * @param string|null $token + * @return $this + */ + public function setToken($token) + { + $this->token = $token; + + return $this; + } + + /** + * @param string|null $org + * @return $this + */ + public function setOrg($org) + { + $this->org = $org; + + return $this; + } + + public function ping($verbose = false) + { + $params = []; + if ($verbose) { + $params['verbose'] = 'true'; + } + return $this->getUrl('ping', $params); + } + + public function getVersion() + { + return $this->getHealth()->then(function ($result) { + return $result->version; + }); + } + + public function getMyOrgId() + { + return $this->getUrl('api/v2/orgs', ['org' => urlencode($this->org)])->then(function ($result) { + foreach ($result->orgs as $org) { + if ($org->name === $this->org) { + return $org->id; + } + } + + throw new \RuntimeException('Org "' . $this->org . '" not found'); + }); + } + + public function listDatabases() + { + // ->links->self = "/api/v2/buckets?descending=false\u0026limit=2\u0026offset=0" + // ->links->next = "next": "/api/v2/buckets?descending=false\u0026limit=2\u0026offset=2" + // 100 -> maxlimit + return $this->getUrl('api/v2/buckets', ['limit' => 100])->then(function ($result) { + $list = []; + foreach ($result->buckets as $bucket) { + $list[] = $bucket->name; + } + + return $list; + }); + } + + public function createDatabase($name) + { + return $this->getMyOrgId()->then(function ($orgId) use ($name) { + return $this->postUrl('api/v2/buckets', [ + 'orgID' => $orgId, + 'name' => $name, + 'retentionRules' => [(object) [ + 'type' => 'expire', + 'everySeconds' => 86400 * 7, + ]] + ]); + })->then(function ($result) { + return $result; + }); + } + + public function getHealth() + { + // Works without Auth + return $this->getUrl('health'); + } + + /** + * TODO: unfinished + * @param $query + * @throws \gipfl\Json\JsonEncodeException + */ + protected function query($query) + { + $prefix = "api/v2/"; + $headers = ['Content-Type' => 'application/json']; + $body = JsonString::encode(['query' => $query]); + $urlParams = ['org' => $this->org]; + } + + /** + * @param string $dbName + * @param DataPoint[] $dataPoints + * @param string|null $precision ns,u,ms,s,m,h + * @return \React\Promise\Promise + */ + public function writeDataPoints($dbName, array $dataPoints, $precision = null) + { + $body = gzencode(\implode($dataPoints), 6); + $params = [ + 'org' => $this->org, + 'bucket' => $dbName, + // TODO: Figure out, whether 2.0.0 also supports bucket. If so, drop db + 'db' => $dbName, + ]; + $headers = [ + 'X-Request-Id' => Uuid::uuid4()->toString(), + 'Content-Encoding' => 'gzip', + 'Content-Length' => strlen($body), + ]; + if ($precision !== null) { + $params['precision'] = $precision; + } + // params['rp'] = $retentionPolicy + return $this->curl->post( + $this->url('api/v2/write', $params), + $this->defaultHeaders() + $headers, + $body, + $this->getDefaultCurlOptions() + ); + } + + protected function getDefaultCurlOptions() + { + return [ + // Hint: avoid 100/Continue + CURLOPT_HTTPHEADER => [ + 'Expect:', + ] + ]; + } + + protected function defaultHeaders() + { + $headers = [ + 'User-Agent' => static::USER_AGENT, + ]; + if ($this->token) { + $headers['Authorization'] = 'Token ' . $this->token; + } + + return $headers; + } + + protected function get($url, $params = null) + { + return $this->curl->get( + $this->url($url, $params), + $this->defaultHeaders() + ); + } + + protected function getRaw($url, $params = null) + { + /** @var Promise $promise */ + $promise = $this + ->get($url, $params) + ->then(function (ResponseInterface $response) { + if ($response->getStatusCode() < 300) { + return (string) $response->getBody(); + } else { + try { + $body = JsonString::decode($response->getBody()); + } catch (\Exception $e) { + throw new \Exception($response->getReasonPhrase()); + } + if (isset($body->message)) { + throw new \Exception($body->message); + } else { + throw new \Exception($response->getReasonPhrase()); + } + } + }); + + return $promise; + } + + protected function postRaw($url, $body, $headers = [], $urlParams = []) + { + /** @var Promise $promise */ + $promise = $this->curl->post( + $this->url($url, $urlParams), + $this->defaultHeaders() + $headers + [ + 'Content-Type' => 'application/json' + ], + $body + )->then(function (ResponseInterface $response) { + return (string) $response->getBody(); + }); + + return $promise; + } + + protected function getUrl($url, $params = null) + { + return $this->getRaw($url, $params)->then(function ($raw) { + return JsonString::decode((string) $raw); + }); + } + + protected function postUrl($url, $body, $headers = [], $urlParams = []) + { + return $this->postRaw($url, JsonString::encode($body), $headers, $urlParams)->then(function ($raw) { + return JsonString::decode((string) $raw); + }); + } + + protected function url($path, $params = []) + { + $url = $this->baseUrl . "/$path"; + if (! empty($params)) { + $url .= '?' . \http_build_query($params); + } + + return $url; + } +} diff --git a/vendor/gipfl/influxdb/src/InfluxDbQueryResult.php b/vendor/gipfl/influxdb/src/InfluxDbQueryResult.php new file mode 100644 index 0000000..0ca6fd1 --- /dev/null +++ b/vendor/gipfl/influxdb/src/InfluxDbQueryResult.php @@ -0,0 +1,65 @@ +<?php + +namespace gipfl\InfluxDb; + +use InvalidArgumentException; + +class InfluxDbQueryResult +{ + public static function extractColumn($result, $idx = 0) + { + if (! isset($result->columns)) { + print_r($result); + exit; + } + $idx = static::getNumericColumn($idx, $result->columns); + $column = []; + foreach ($result->values as $row) { + $column[] = $row[$idx]; + } + + return $column; + } + + protected static function getNumericColumn($name, $cols) + { + if (\is_int($name)) { + if (isset($cols[$name])) { + return $name; + } + } + if (\is_string($name)) { + foreach ($cols as $idx => $alias) { + if ($name === $alias) { + return $idx; + } + } + } + + throw new InvalidArgumentException("There is no '$name' column in the result"); + } + + protected static function extractPairs($result, $keyColumn = 0, $valueColumn = 1) + { + $keyColumn = static::getNumericColumn($keyColumn, $result->columns); + $valueColumn = static::getNumericColumn($valueColumn, $result->columns); + $pairs = []; + foreach ($result->values as $row) { + $pairs[$row[$keyColumn]] = $row[$valueColumn]; + } + + return $pairs; + } + + protected static function transformResultsTable($table) + { + // $table->name = 'databases' + $cols = $table->columns; + $values = []; + foreach ($table->values as $row) { + $values[] = (object) \array_combine($cols, $row); + } + + return $values; + } +} diff --git a/vendor/gipfl/influxdb/src/LineProtocol.php b/vendor/gipfl/influxdb/src/LineProtocol.php new file mode 100644 index 0000000..b3b5f4a --- /dev/null +++ b/vendor/gipfl/influxdb/src/LineProtocol.php @@ -0,0 +1,63 @@ +<?php + +namespace gipfl\InfluxDb; + +use function ksort; +use function strlen; + +abstract class LineProtocol +{ + public static function renderMeasurement($measurement, $tags = [], $fields = [], $timestamp = null) + { + return Escape::measurement($measurement) + . static::renderTags($tags) + . static::renderFields($fields) + . static::renderTimeStamp($timestamp) + . "\n"; + } + + public static function renderTags($tags) + { + ksort($tags); + $string = ''; + foreach ($tags as $key => $value) { + if ($value === null || strlen($value) === 0) { + continue; + } + $string .= ',' . static::renderTag($key, $value); + } + + return $string; + } + + public static function renderFields($fields) + { + $string = ''; + foreach ($fields as $key => $value) { + $string .= ',' . static::renderField($key, $value); + } + $string[0] = ' '; + + return $string; + } + + public static function renderTimeStamp($timestamp) + { + if ($timestamp === null) { + return ''; + } else { + return " $timestamp"; + } + } + + public static function renderTag($key, $value) + { + return Escape::key($key) . '=' . Escape::tagValue($value); + } + + public static function renderField($key, $value) + { + + return Escape::key($key) . '=' . Escape::fieldValue($value); + } +} |