diff options
Diffstat (limited to 'tests/integration/deckard/pydnstest/augwrap.py')
-rw-r--r-- | tests/integration/deckard/pydnstest/augwrap.py | 226 |
1 files changed, 226 insertions, 0 deletions
diff --git a/tests/integration/deckard/pydnstest/augwrap.py b/tests/integration/deckard/pydnstest/augwrap.py new file mode 100644 index 0000000..a0ec190 --- /dev/null +++ b/tests/integration/deckard/pydnstest/augwrap.py @@ -0,0 +1,226 @@ +#!/usr/bin/python3 + +# Copyright (C) 2017 + +import collections.abc +import logging +import os # requires posix + +from augeas import Augeas + +AUGEAS_LOAD_PATH = '/augeas/load/' +AUGEAS_FILES_PATH = '/files/' +AUGEAS_ERROR_PATH = '//error' + +log = logging.getLogger('augeas') + + +def join(*paths): + """ + join two Augeas tree paths + + FIXME: Beware: // is normalized to / + """ + norm_paths = [os.path.normpath(path) for path in paths] + # first path must be absolute + assert norm_paths[0][0] == '/' + new_paths = [norm_paths[0]] + # relativize all other paths so join works as expected + for path in norm_paths[1:]: + if path.startswith('/'): + path = path[1:] + new_paths.append(path) + new_path = os.path.join(*new_paths) + log.debug("join: new_path %s", new_path) + return os.path.normpath(new_path) + + +class AugeasWrapper: + """python-augeas higher-level wrapper. + + Load single augeas lens and configuration file. + Exposes configuration file as AugeasNode object with dict-like interface. + + AugeasWrapper can be used in with statement in the same way as file does. + """ + + def __init__(self, confpath, lens, root=None, loadpath=None, + flags=Augeas.NO_MODL_AUTOLOAD | Augeas.NO_LOAD | Augeas.ENABLE_SPAN): + """Parse configuration file using given lens. + + Params: + confpath (str): Absolute path to the configuration file + lens (str): Name of module containing Augeas lens + root: passed down to original Augeas + flags: passed down to original Augeas + loadpath: passed down to original Augeas + flags: passed down to original Augeas + """ + log.debug('loadpath: %s', loadpath) + log.debug('confpath: %s', confpath) + self._aug = Augeas(root=root, loadpath=loadpath, flags=flags) + + # /augeas/load/{lens} + aug_load_path = join(AUGEAS_LOAD_PATH, lens) + # /augeas/load/{lens}/lens = {lens}.lns + self._aug.set(join(aug_load_path, 'lens'), '%s.lns' % lens) + # /augeas/load/{lens}/incl[0] = {confpath} + self._aug.set(join(aug_load_path, 'incl[0]'), confpath) + self._aug.load() + + errors = self._aug.match(AUGEAS_ERROR_PATH) + if errors: + err_msg = '\n'.join( + ["{}: {}".format(e, self._aug.get(e)) for e in errors] + ) + raise RuntimeError(err_msg) + + path = join(AUGEAS_FILES_PATH, confpath) + paths = self._aug.match(path) + if len(paths) != 1: + raise ValueError('path %s did not match exactly once' % path) + self.tree = AugeasNode(self._aug, path) + self._loaded = True + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.save() + self.close() + + def save(self): + """Save Augeas tree to its original file.""" + assert self._loaded + try: + self._aug.save() + except IOError as exc: + log.exception(exc) + for err_path in self._aug.match('//error'): + log.error('%s: %s', err_path, + self._aug.get(os.path.join(err_path, 'message'))) + raise + + def close(self): + """ + close Augeas library + + After calling close() the object must not be used anymore. + """ + assert self._loaded + self._aug.close() + del self._aug + self._loaded = False + + def match(self, path): + """Yield AugeasNodes matching given expression.""" + assert self._loaded + assert path + log.debug('tree match %s', path) + for matched_path in self._aug.match(path): + yield AugeasNode(self._aug, matched_path) + + +class AugeasNode(collections.abc.MutableMapping): + """One Augeas tree node with dict-like interface.""" + + def __init__(self, aug, path): + """ + Args: + aug (AugeasWrapper or Augeas): Augeas library instance + path (str): absolute path in Augeas tree matching single node + + BEWARE: There are no sanity checks of given path for performance reasons. + """ + assert aug + assert path + assert path.startswith('/') + self._aug = aug + self._path = path + self._span = None + + @property + def path(self): + """canonical path in Augeas tree, read-only""" + return self._path + + @property + def value(self): + """ + get value of this node in Augeas tree + """ + value = self._aug.get(self._path) + log.debug('tree get: %s = %s', self._path, value) + return value + + @value.setter + def value(self, value): + """ + set value of this node in Augeas tree + """ + log.debug('tree set: %s = %s', self._path, value) + self._aug.set(self._path, value) + + @property + def span(self): + if self._span is None: + self._span = "char position %s" % self._aug.span(self._path)[5] + return self._span + + @property + def char(self): + return self._aug.span(self._path)[5] + + def __len__(self): + """ + number of items matching this path + + It is always 1 after __init__() but it may change + as Augeas tree changes. + """ + return len(self._aug.match(self._path)) + + def __getitem__(self, key): + if isinstance(key, int): + # int is a shortcut to write [int] + target_path = '%s[%s]' % (self._path, key) + else: + target_path = self._path + key + log.debug('tree getitem: target_path %s', target_path) + paths = self._aug.match(target_path) + if len(paths) != 1: + raise KeyError('path %s did not match exactly once' % target_path) + return AugeasNode(self._aug, target_path) + + def __delitem__(self, key): + log.debug('tree delitem: %s + %s', self._path, key) + target_path = self._path + key + log.debug('tree delitem: target_path %s', target_path) + self._aug.remove(target_path) + + def __setitem__(self, key, value): + assert isinstance(value, AugeasNode) + target_path = self.path + key + self._aug.copy(value.path, target_path) + + def __iter__(self): + self_path_len = len(self._path) + assert self_path_len > 0 + + log.debug('tree iter: %s', self._path) + for new_path in self._aug.match(self._path): + if len(new_path) == self_path_len: + yield '' + else: + yield new_path[self_path_len - 1:] + + def match(self, subpath): + """Yield AugeasNodes matching given sub-expression.""" + assert subpath.startswith("/") + match_path = "%s%s" % (self._path, subpath) + log.debug('tree match %s: %s', match_path, self._path) + for matched_path in self._aug.match(match_path): + yield AugeasNode(self._aug, matched_path) + + def __repr__(self): + return 'AugeasNode(%s)' % self._path |