summaryrefslogtreecommitdiffstats
path: root/tests/integration/deckard/pydnstest/augwrap.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/integration/deckard/pydnstest/augwrap.py')
-rw-r--r--tests/integration/deckard/pydnstest/augwrap.py226
1 files changed, 226 insertions, 0 deletions
diff --git a/tests/integration/deckard/pydnstest/augwrap.py b/tests/integration/deckard/pydnstest/augwrap.py
new file mode 100644
index 0000000..a0ec190
--- /dev/null
+++ b/tests/integration/deckard/pydnstest/augwrap.py
@@ -0,0 +1,226 @@
+#!/usr/bin/python3
+
+# Copyright (C) 2017
+
+import collections.abc
+import logging
+import os # requires posix
+
+from augeas import Augeas
+
+AUGEAS_LOAD_PATH = '/augeas/load/'
+AUGEAS_FILES_PATH = '/files/'
+AUGEAS_ERROR_PATH = '//error'
+
+log = logging.getLogger('augeas')
+
+
+def join(*paths):
+ """
+ join two Augeas tree paths
+
+ FIXME: Beware: // is normalized to /
+ """
+ norm_paths = [os.path.normpath(path) for path in paths]
+ # first path must be absolute
+ assert norm_paths[0][0] == '/'
+ new_paths = [norm_paths[0]]
+ # relativize all other paths so join works as expected
+ for path in norm_paths[1:]:
+ if path.startswith('/'):
+ path = path[1:]
+ new_paths.append(path)
+ new_path = os.path.join(*new_paths)
+ log.debug("join: new_path %s", new_path)
+ return os.path.normpath(new_path)
+
+
+class AugeasWrapper:
+ """python-augeas higher-level wrapper.
+
+ Load single augeas lens and configuration file.
+ Exposes configuration file as AugeasNode object with dict-like interface.
+
+ AugeasWrapper can be used in with statement in the same way as file does.
+ """
+
+ def __init__(self, confpath, lens, root=None, loadpath=None,
+ flags=Augeas.NO_MODL_AUTOLOAD | Augeas.NO_LOAD | Augeas.ENABLE_SPAN):
+ """Parse configuration file using given lens.
+
+ Params:
+ confpath (str): Absolute path to the configuration file
+ lens (str): Name of module containing Augeas lens
+ root: passed down to original Augeas
+ flags: passed down to original Augeas
+ loadpath: passed down to original Augeas
+ flags: passed down to original Augeas
+ """
+ log.debug('loadpath: %s', loadpath)
+ log.debug('confpath: %s', confpath)
+ self._aug = Augeas(root=root, loadpath=loadpath, flags=flags)
+
+ # /augeas/load/{lens}
+ aug_load_path = join(AUGEAS_LOAD_PATH, lens)
+ # /augeas/load/{lens}/lens = {lens}.lns
+ self._aug.set(join(aug_load_path, 'lens'), '%s.lns' % lens)
+ # /augeas/load/{lens}/incl[0] = {confpath}
+ self._aug.set(join(aug_load_path, 'incl[0]'), confpath)
+ self._aug.load()
+
+ errors = self._aug.match(AUGEAS_ERROR_PATH)
+ if errors:
+ err_msg = '\n'.join(
+ ["{}: {}".format(e, self._aug.get(e)) for e in errors]
+ )
+ raise RuntimeError(err_msg)
+
+ path = join(AUGEAS_FILES_PATH, confpath)
+ paths = self._aug.match(path)
+ if len(paths) != 1:
+ raise ValueError('path %s did not match exactly once' % path)
+ self.tree = AugeasNode(self._aug, path)
+ self._loaded = True
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.save()
+ self.close()
+
+ def save(self):
+ """Save Augeas tree to its original file."""
+ assert self._loaded
+ try:
+ self._aug.save()
+ except IOError as exc:
+ log.exception(exc)
+ for err_path in self._aug.match('//error'):
+ log.error('%s: %s', err_path,
+ self._aug.get(os.path.join(err_path, 'message')))
+ raise
+
+ def close(self):
+ """
+ close Augeas library
+
+ After calling close() the object must not be used anymore.
+ """
+ assert self._loaded
+ self._aug.close()
+ del self._aug
+ self._loaded = False
+
+ def match(self, path):
+ """Yield AugeasNodes matching given expression."""
+ assert self._loaded
+ assert path
+ log.debug('tree match %s', path)
+ for matched_path in self._aug.match(path):
+ yield AugeasNode(self._aug, matched_path)
+
+
+class AugeasNode(collections.abc.MutableMapping):
+ """One Augeas tree node with dict-like interface."""
+
+ def __init__(self, aug, path):
+ """
+ Args:
+ aug (AugeasWrapper or Augeas): Augeas library instance
+ path (str): absolute path in Augeas tree matching single node
+
+ BEWARE: There are no sanity checks of given path for performance reasons.
+ """
+ assert aug
+ assert path
+ assert path.startswith('/')
+ self._aug = aug
+ self._path = path
+ self._span = None
+
+ @property
+ def path(self):
+ """canonical path in Augeas tree, read-only"""
+ return self._path
+
+ @property
+ def value(self):
+ """
+ get value of this node in Augeas tree
+ """
+ value = self._aug.get(self._path)
+ log.debug('tree get: %s = %s', self._path, value)
+ return value
+
+ @value.setter
+ def value(self, value):
+ """
+ set value of this node in Augeas tree
+ """
+ log.debug('tree set: %s = %s', self._path, value)
+ self._aug.set(self._path, value)
+
+ @property
+ def span(self):
+ if self._span is None:
+ self._span = "char position %s" % self._aug.span(self._path)[5]
+ return self._span
+
+ @property
+ def char(self):
+ return self._aug.span(self._path)[5]
+
+ def __len__(self):
+ """
+ number of items matching this path
+
+ It is always 1 after __init__() but it may change
+ as Augeas tree changes.
+ """
+ return len(self._aug.match(self._path))
+
+ def __getitem__(self, key):
+ if isinstance(key, int):
+ # int is a shortcut to write [int]
+ target_path = '%s[%s]' % (self._path, key)
+ else:
+ target_path = self._path + key
+ log.debug('tree getitem: target_path %s', target_path)
+ paths = self._aug.match(target_path)
+ if len(paths) != 1:
+ raise KeyError('path %s did not match exactly once' % target_path)
+ return AugeasNode(self._aug, target_path)
+
+ def __delitem__(self, key):
+ log.debug('tree delitem: %s + %s', self._path, key)
+ target_path = self._path + key
+ log.debug('tree delitem: target_path %s', target_path)
+ self._aug.remove(target_path)
+
+ def __setitem__(self, key, value):
+ assert isinstance(value, AugeasNode)
+ target_path = self.path + key
+ self._aug.copy(value.path, target_path)
+
+ def __iter__(self):
+ self_path_len = len(self._path)
+ assert self_path_len > 0
+
+ log.debug('tree iter: %s', self._path)
+ for new_path in self._aug.match(self._path):
+ if len(new_path) == self_path_len:
+ yield ''
+ else:
+ yield new_path[self_path_len - 1:]
+
+ def match(self, subpath):
+ """Yield AugeasNodes matching given sub-expression."""
+ assert subpath.startswith("/")
+ match_path = "%s%s" % (self._path, subpath)
+ log.debug('tree match %s: %s', match_path, self._path)
+ for matched_path in self._aug.match(match_path):
+ yield AugeasNode(self._aug, matched_path)
+
+ def __repr__(self):
+ return 'AugeasNode(%s)' % self._path