summaryrefslogtreecommitdiffstats
path: root/library/Director/Import/Import.php
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--library/Director/Import/Import.php481
1 files changed, 481 insertions, 0 deletions
diff --git a/library/Director/Import/Import.php b/library/Director/Import/Import.php
new file mode 100644
index 0000000..f82454d
--- /dev/null
+++ b/library/Director/Import/Import.php
@@ -0,0 +1,481 @@
+<?php
+
+namespace Icinga\Module\Director\Import;
+
+use Exception;
+use Icinga\Application\Benchmark;
+use Icinga\Exception\IcingaException;
+use Icinga\Module\Director\Data\RecursiveUtf8Validator;
+use Icinga\Module\Director\Db;
+use Icinga\Module\Director\Hook\ImportSourceHook;
+use Icinga\Module\Director\Objects\ImportSource;
+use Icinga\Module\Director\Util;
+use stdClass;
+
+class Import
+{
+ /**
+ * @var ImportSource
+ */
+ protected $source;
+
+ /**
+ * @var Db
+ */
+ protected $connection;
+
+ /**
+ * @var \Zend_Db_Adapter_Abstract
+ */
+ protected $db;
+
+ /**
+ * Raw data that should be imported, array of stdClass objects
+ *
+ * @var array
+ */
+ protected $data;
+
+ /**
+ * Checksum of the rowset that should be imported
+ *
+ * @var string
+ */
+ private $rowsetChecksum;
+
+ /**
+ * Checksum-indexed rows
+ *
+ * @var array
+ */
+ private $rows;
+
+ /**
+ * Checksum-indexed row -> property
+ *
+ * @var array
+ */
+ private $rowProperties;
+
+ /**
+ * Whether this rowset exists, for caching purposes
+ *
+ * @var boolean
+ */
+ private $rowsetExists;
+
+ protected $properties = array();
+
+ /**
+ * Checksums of all rows
+ */
+ private $rowChecksums;
+
+ public function __construct(ImportSource $source)
+ {
+ $this->source = $source;
+ $this->connection = $source->getConnection();
+ $this->db = $this->connection->getDbAdapter();
+ }
+
+ /**
+ * Whether this import provides modified data
+ *
+ * @return boolean
+ */
+ public function providesChanges()
+ {
+ return ! $this->rowsetExists()
+ || ! $this->lastRowsetIs($this->rowsetChecksum());
+ }
+
+ /**
+ * Trigger an import run
+ *
+ * @return int Last import run ID
+ */
+ public function run()
+ {
+ if ($this->providesChanges() && ! $this->rowsetExists()) {
+ $this->storeRowset();
+ }
+
+ $this->db->insert(
+ 'import_run',
+ array(
+ 'source_id' => $this->source->get('id'),
+ 'rowset_checksum' => $this->quoteBinary($this->rowsetChecksum()),
+ 'start_time' => date('Y-m-d H:i:s'),
+ 'succeeded' => 'y'
+ )
+ );
+ if ($this->connection->isPgsql()) {
+ return $this->db->lastInsertId('import_run', 'id');
+ } else {
+ return $this->db->lastInsertId();
+ }
+ }
+
+ /**
+ * Whether there are no rows to be fetched from import source
+ *
+ * @return boolean
+ */
+ public function isEmpty()
+ {
+ $rows = $this->checksummedRows();
+ return empty($rows);
+ }
+
+ /**
+ * Checksum of all available rows
+ *
+ * @return string
+ */
+ protected function & rowsetChecksum()
+ {
+ if ($this->rowsetChecksum === null) {
+ $this->prepareChecksummedRows();
+ }
+
+ return $this->rowsetChecksum;
+ }
+
+ /**
+ * All rows
+ *
+ * @return array
+ */
+ protected function & checksummedRows()
+ {
+ if ($this->rows === null) {
+ $this->prepareChecksummedRows();
+ }
+
+ return $this->rows;
+ }
+
+ /**
+ * Checksum of all available rows
+ *
+ * @return array
+ */
+ protected function & rawData()
+ {
+ if ($this->data === null) {
+ $this->data = ImportSourceHook::forImportSource(
+ $this->source
+ )->fetchData();
+ Benchmark::measure('Fetched all data from Import Source');
+ $this->source->applyModifiers($this->data);
+ Benchmark::measure('Applied Property Modifiers to imported data');
+ }
+
+ return $this->data;
+ }
+
+ /**
+ * Prepare and remember an ImportedProperty
+ *
+ * @param string $key
+ * @param mixed $rawValue
+ *
+ * @return array
+ */
+ protected function prepareImportedProperty($key, $rawValue)
+ {
+ if (is_array($rawValue) || is_bool($rawValue) || is_int($rawValue) || is_float($rawValue)) {
+ $value = json_encode($rawValue);
+ $format = 'json';
+ } elseif ($rawValue instanceof stdClass) {
+ $value = json_encode($this->sortObject($rawValue));
+ $format = 'json';
+ } else {
+ $value = $rawValue;
+ $format = 'string';
+ }
+
+ $checksum = sha1(sprintf('%s=(%s)%s', $key, $format, $value), true);
+
+ if (! array_key_exists($checksum, $this->properties)) {
+ $this->properties[$checksum] = array(
+ 'checksum' => $this->quoteBinary($checksum),
+ 'property_name' => $key,
+ 'property_value' => $value,
+ 'format' => $format
+ );
+ }
+
+ return $this->properties[$checksum];
+ }
+
+ /**
+ * Walk through each row, prepare properties and calculate checksums
+ */
+ protected function prepareChecksummedRows()
+ {
+ $keyColumn = $this->source->get('key_column');
+ $this->rows = array();
+ $this->rowProperties = array();
+ $objects = array();
+ $rowCount = 0;
+
+ foreach ($this->rawData() as $row) {
+ $rowCount++;
+
+ // Key column must be set
+ if (! isset($row->$keyColumn)) {
+ throw new IcingaException(
+ 'No key column "%s" in row %d',
+ $keyColumn,
+ $rowCount
+ );
+ }
+
+ $object_name = $row->$keyColumn;
+
+ // Check for name collision
+ if (array_key_exists($object_name, $objects)) {
+ throw new IcingaException(
+ 'Duplicate entry: %s',
+ $object_name
+ );
+ }
+
+ $rowChecksums = array();
+ $keys = array_keys((array) $row);
+ sort($keys);
+
+ foreach ($keys as $key) {
+ // TODO: Specify how to treat NULL values. Ignoring for now.
+ // One option might be to import null (checksum '(null)')
+ // and to provide a flag at sync time
+ if ($row->$key === null) {
+ continue;
+ }
+
+ $property = $this->prepareImportedProperty($key, $row->$key);
+ $rowChecksums[] = $property['checksum'];
+ }
+
+ $checksum = sha1($object_name . ';' . implode(';', $rowChecksums), true);
+ if (array_key_exists($checksum, $this->rows)) {
+ die('WTF, collision?');
+ }
+
+ $this->rows[$checksum] = array(
+ 'checksum' => $this->quoteBinary($checksum),
+ 'object_name' => $object_name
+ );
+
+ $this->rowProperties[$checksum] = $rowChecksums;
+
+ $objects[$object_name] = $checksum;
+ }
+
+ $this->rowChecksums = array_keys($this->rows);
+ $this->rowsetChecksum = sha1(implode(';', $this->rowChecksums), true);
+ return $this;
+ }
+
+ /**
+ * Store our new rowset
+ */
+ protected function storeRowset()
+ {
+ $db = $this->db;
+ $rowset = $this->rowsetChecksum();
+ $rows = $this->checksummedRows();
+
+ $db->beginTransaction();
+
+ try {
+ if ($this->isEmpty()) {
+ $newRows = array();
+ $newProperties = array();
+ } else {
+ $newRows = $this->newChecksums('imported_row', $this->rowChecksums);
+ $newProperties = $this->newChecksums('imported_property', array_keys($this->properties));
+ }
+
+ $db->insert('imported_rowset', array('checksum' => $this->quoteBinary($rowset)));
+
+ foreach ($newProperties as $checksum) {
+ $db->insert('imported_property', $this->properties[$checksum]);
+ }
+
+ foreach ($newRows as $row) {
+ try {
+ $db->insert('imported_row', $rows[$row]);
+ foreach ($this->rowProperties[$row] as $property) {
+ $db->insert('imported_row_property', array(
+ 'row_checksum' => $this->quoteBinary($row),
+ 'property_checksum' => $property
+ ));
+ }
+ } catch (Exception $e) {
+ throw new IcingaException(
+ "Error while storing a row for '%s' into database: %s",
+ $rows[$row]['object_name'],
+ $e->getMessage()
+ );
+ }
+ }
+
+ foreach (array_keys($rows) as $row) {
+ $db->insert(
+ 'imported_rowset_row',
+ array(
+ 'rowset_checksum' => $this->quoteBinary($rowset),
+ 'row_checksum' => $this->quoteBinary($row)
+ )
+ );
+ }
+
+ $db->commit();
+
+ $this->rowsetExists = true;
+ } catch (Exception $e) {
+ try {
+ $db->rollBack();
+ } catch (Exception $e) {
+ // Well...
+ }
+ // Eventually throws details for invalid UTF8 characters
+ RecursiveUtf8Validator::validateRows($this->data);
+ throw $e;
+ }
+ }
+
+ /**
+ * Whether the last run of this import matches the given checksum
+ *
+ * @param string $checksum Binary checksum
+ *
+ * @return bool
+ */
+ protected function lastRowsetIs($checksum)
+ {
+ return $this->connection->getLatestImportedChecksum($this->source->get('id'))
+ === bin2hex($checksum);
+ }
+
+ /**
+ * Whether our rowset already exists in the database
+ *
+ * @return boolean
+ */
+ protected function rowsetExists()
+ {
+ if (null === $this->rowsetExists) {
+ $this->rowsetExists = 0 === count(
+ $this->newChecksums(
+ 'imported_rowset',
+ array($this->rowsetChecksum())
+ )
+ );
+ }
+
+ return $this->rowsetExists;
+ }
+
+ /**
+ * Finde new checksums for a specific table
+ *
+ * Accepts an array of checksums and gives you an array with those checksums
+ * that are missing in the given table
+ *
+ * @param string $table Database table name
+ * @param array $checksums Array with the checksums that should be verified
+ *
+ * @return array
+ */
+ protected function newChecksums($table, $checksums)
+ {
+ $db = $this->db;
+
+ // TODO: The following is a quickfix for binary data corrpution reported
+ // in https://github.com/zendframework/zf1/issues/655 caused by
+ // https://github.com/zendframework/zf1/commit/2ac9c30f
+ //
+ // Should be reverted once fixed, eventually with a check continueing
+ // to use this workaround for specific ZF versions (1.12.16 and 1.12.17
+ // so far). Alternatively we could also use a custom quoteInto method.
+
+ // The former query looked as follows:
+ //
+ // $query = $db->select()->from($table, 'checksum')
+ // ->where('checksum IN (?)', $checksums)
+ // ...
+ // return array_diff($checksums, $existing);
+
+ $hexed = array_map('bin2hex', $checksums);
+
+ $conn = $this->connection;
+ $query = $db
+ ->select()
+ ->from(
+ array('c' => $table),
+ array('checksum' => $conn->dbHexFunc('c.checksum'))
+ )->where(
+ $conn->dbHexFunc('c.checksum') . ' IN (?)',
+ $hexed
+ );
+
+ $existing = $db->fetchCol($query);
+ $new = array_diff($hexed, $existing);
+
+ return array_map('hex2bin', $new);
+ }
+
+ /**
+ * Sort a given stdClass object by property name
+ *
+ * @param stdClass $object
+ *
+ * @return object
+ */
+ protected function sortObject($object)
+ {
+ $array = (array) $object;
+ foreach ($array as $key => $val) {
+ $this->sortElement($val);
+ }
+ ksort($array);
+ return (object) $array;
+ }
+
+ /**
+ * Walk through a given array and sort all children
+ *
+ * Please note that the array itself will NOT be sorted, as arrays must
+ * keep their ordering
+ *
+ * @param array $array
+ */
+ protected function sortArrayObject(&$array)
+ {
+ foreach ($array as $key => $val) {
+ $this->sortElement($val);
+ }
+ }
+
+ /**
+ * Recursively sort a given property
+ *
+ * @param mixed $el
+ */
+ protected function sortElement(&$el)
+ {
+ if (is_array($el)) {
+ $this->sortArrayObject($el);
+ } elseif ($el instanceof stdClass) {
+ $el = $this->sortObject($el);
+ }
+ }
+
+ protected function quoteBinary($bin)
+ {
+ return $this->connection->quoteBinary($bin);
+ }
+}