summaryrefslogtreecommitdiffstats
path: root/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/common/config.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/common/config.py')
-rw-r--r--test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/common/config.py473
1 files changed, 473 insertions, 0 deletions
diff --git a/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/common/config.py b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/common/config.py
new file mode 100644
index 0000000..bc458eb
--- /dev/null
+++ b/test/support/network-integration/collections/ansible_collections/ansible/netcommon/plugins/module_utils/network/common/config.py
@@ -0,0 +1,473 @@
+# This code is part of Ansible, but is an independent component.
+# This particular file snippet, and this file snippet only, is BSD licensed.
+# Modules you write using this snippet, which is embedded dynamically by Ansible
+# still belong to the author of the module, and may assign their own license
+# to the complete work.
+#
+# (c) 2016 Red Hat Inc.
+#
+# Redistribution and use in source and binary forms, with or without modification,
+# are permitted provided that the following conditions are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
+# and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
+# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+import re
+import hashlib
+
+from ansible.module_utils.six.moves import zip
+from ansible.module_utils._text import to_bytes, to_native
+
+DEFAULT_COMMENT_TOKENS = ["#", "!", "/*", "*/", "echo"]
+
+DEFAULT_IGNORE_LINES_RE = set(
+ [
+ re.compile(r"Using \d+ out of \d+ bytes"),
+ re.compile(r"Building configuration"),
+ re.compile(r"Current configuration : \d+ bytes"),
+ ]
+)
+
+
+try:
+ Pattern = re._pattern_type
+except AttributeError:
+ Pattern = re.Pattern
+
+
+class ConfigLine(object):
+ def __init__(self, raw):
+ self.text = str(raw).strip()
+ self.raw = raw
+ self._children = list()
+ self._parents = list()
+
+ def __str__(self):
+ return self.raw
+
+ def __eq__(self, other):
+ return self.line == other.line
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __getitem__(self, key):
+ for item in self._children:
+ if item.text == key:
+ return item
+ raise KeyError(key)
+
+ @property
+ def line(self):
+ line = self.parents
+ line.append(self.text)
+ return " ".join(line)
+
+ @property
+ def children(self):
+ return _obj_to_text(self._children)
+
+ @property
+ def child_objs(self):
+ return self._children
+
+ @property
+ def parents(self):
+ return _obj_to_text(self._parents)
+
+ @property
+ def path(self):
+ config = _obj_to_raw(self._parents)
+ config.append(self.raw)
+ return "\n".join(config)
+
+ @property
+ def has_children(self):
+ return len(self._children) > 0
+
+ @property
+ def has_parents(self):
+ return len(self._parents) > 0
+
+ def add_child(self, obj):
+ if not isinstance(obj, ConfigLine):
+ raise AssertionError("child must be of type `ConfigLine`")
+ self._children.append(obj)
+
+
+def ignore_line(text, tokens=None):
+ for item in tokens or DEFAULT_COMMENT_TOKENS:
+ if text.startswith(item):
+ return True
+ for regex in DEFAULT_IGNORE_LINES_RE:
+ if regex.match(text):
+ return True
+
+
+def _obj_to_text(x):
+ return [o.text for o in x]
+
+
+def _obj_to_raw(x):
+ return [o.raw for o in x]
+
+
+def _obj_to_block(objects, visited=None):
+ items = list()
+ for o in objects:
+ if o not in items:
+ items.append(o)
+ for child in o._children:
+ if child not in items:
+ items.append(child)
+ return _obj_to_raw(items)
+
+
+def dumps(objects, output="block", comments=False):
+ if output == "block":
+ items = _obj_to_block(objects)
+ elif output == "commands":
+ items = _obj_to_text(objects)
+ elif output == "raw":
+ items = _obj_to_raw(objects)
+ else:
+ raise TypeError("unknown value supplied for keyword output")
+
+ if output == "block":
+ if comments:
+ for index, item in enumerate(items):
+ nextitem = index + 1
+ if (
+ nextitem < len(items)
+ and not item.startswith(" ")
+ and items[nextitem].startswith(" ")
+ ):
+ item = "!\n%s" % item
+ items[index] = item
+ items.append("!")
+ items.append("end")
+
+ return "\n".join(items)
+
+
+class NetworkConfig(object):
+ def __init__(self, indent=1, contents=None, ignore_lines=None):
+ self._indent = indent
+ self._items = list()
+ self._config_text = None
+
+ if ignore_lines:
+ for item in ignore_lines:
+ if not isinstance(item, Pattern):
+ item = re.compile(item)
+ DEFAULT_IGNORE_LINES_RE.add(item)
+
+ if contents:
+ self.load(contents)
+
+ @property
+ def items(self):
+ return self._items
+
+ @property
+ def config_text(self):
+ return self._config_text
+
+ @property
+ def sha1(self):
+ sha1 = hashlib.sha1()
+ sha1.update(to_bytes(str(self), errors="surrogate_or_strict"))
+ return sha1.digest()
+
+ def __getitem__(self, key):
+ for line in self:
+ if line.text == key:
+ return line
+ raise KeyError(key)
+
+ def __iter__(self):
+ return iter(self._items)
+
+ def __str__(self):
+ return "\n".join([c.raw for c in self.items])
+
+ def __len__(self):
+ return len(self._items)
+
+ def load(self, s):
+ self._config_text = s
+ self._items = self.parse(s)
+
+ def loadfp(self, fp):
+ with open(fp) as f:
+ return self.load(f.read())
+
+ def parse(self, lines, comment_tokens=None):
+ toplevel = re.compile(r"\S")
+ childline = re.compile(r"^\s*(.+)$")
+ entry_reg = re.compile(r"([{};])")
+
+ ancestors = list()
+ config = list()
+
+ indents = [0]
+
+ for linenum, line in enumerate(
+ to_native(lines, errors="surrogate_or_strict").split("\n")
+ ):
+ text = entry_reg.sub("", line).strip()
+
+ cfg = ConfigLine(line)
+
+ if not text or ignore_line(text, comment_tokens):
+ continue
+
+ # handle top level commands
+ if toplevel.match(line):
+ ancestors = [cfg]
+ indents = [0]
+
+ # handle sub level commands
+ else:
+ match = childline.match(line)
+ line_indent = match.start(1)
+
+ if line_indent < indents[-1]:
+ while indents[-1] > line_indent:
+ indents.pop()
+
+ if line_indent > indents[-1]:
+ indents.append(line_indent)
+
+ curlevel = len(indents) - 1
+ parent_level = curlevel - 1
+
+ cfg._parents = ancestors[:curlevel]
+
+ if curlevel > len(ancestors):
+ config.append(cfg)
+ continue
+
+ for i in range(curlevel, len(ancestors)):
+ ancestors.pop()
+
+ ancestors.append(cfg)
+ ancestors[parent_level].add_child(cfg)
+
+ config.append(cfg)
+
+ return config
+
+ def get_object(self, path):
+ for item in self.items:
+ if item.text == path[-1]:
+ if item.parents == path[:-1]:
+ return item
+
+ def get_block(self, path):
+ if not isinstance(path, list):
+ raise AssertionError("path argument must be a list object")
+ obj = self.get_object(path)
+ if not obj:
+ raise ValueError("path does not exist in config")
+ return self._expand_block(obj)
+
+ def get_block_config(self, path):
+ block = self.get_block(path)
+ return dumps(block, "block")
+
+ def _expand_block(self, configobj, S=None):
+ if S is None:
+ S = list()
+ S.append(configobj)
+ for child in configobj._children:
+ if child in S:
+ continue
+ self._expand_block(child, S)
+ return S
+
+ def _diff_line(self, other):
+ updates = list()
+ for item in self.items:
+ if item not in other:
+ updates.append(item)
+ return updates
+
+ def _diff_strict(self, other):
+ updates = list()
+ # block extracted from other does not have all parents
+ # but the last one. In case of multiple parents we need
+ # to add additional parents.
+ if other and isinstance(other, list) and len(other) > 0:
+ start_other = other[0]
+ if start_other.parents:
+ for parent in start_other.parents:
+ other.insert(0, ConfigLine(parent))
+ for index, line in enumerate(self.items):
+ try:
+ if str(line).strip() != str(other[index]).strip():
+ updates.append(line)
+ except (AttributeError, IndexError):
+ updates.append(line)
+ return updates
+
+ def _diff_exact(self, other):
+ updates = list()
+ if len(other) != len(self.items):
+ updates.extend(self.items)
+ else:
+ for ours, theirs in zip(self.items, other):
+ if ours != theirs:
+ updates.extend(self.items)
+ break
+ return updates
+
+ def difference(self, other, match="line", path=None, replace=None):
+ """Perform a config diff against the another network config
+
+ :param other: instance of NetworkConfig to diff against
+ :param match: type of diff to perform. valid values are 'line',
+ 'strict', 'exact'
+ :param path: context in the network config to filter the diff
+ :param replace: the method used to generate the replacement lines.
+ valid values are 'block', 'line'
+
+ :returns: a string of lines that are different
+ """
+ if path and match != "line":
+ try:
+ other = other.get_block(path)
+ except ValueError:
+ other = list()
+ else:
+ other = other.items
+
+ # generate a list of ConfigLines that aren't in other
+ meth = getattr(self, "_diff_%s" % match)
+ updates = meth(other)
+
+ if replace == "block":
+ parents = list()
+ for item in updates:
+ if not item.has_parents:
+ parents.append(item)
+ else:
+ for p in item._parents:
+ if p not in parents:
+ parents.append(p)
+
+ updates = list()
+ for item in parents:
+ updates.extend(self._expand_block(item))
+
+ visited = set()
+ expanded = list()
+
+ for item in updates:
+ for p in item._parents:
+ if p.line not in visited:
+ visited.add(p.line)
+ expanded.append(p)
+ expanded.append(item)
+ visited.add(item.line)
+
+ return expanded
+
+ def add(self, lines, parents=None):
+ ancestors = list()
+ offset = 0
+ obj = None
+
+ # global config command
+ if not parents:
+ for line in lines:
+ # handle ignore lines
+ if ignore_line(line):
+ continue
+
+ item = ConfigLine(line)
+ item.raw = line
+ if item not in self.items:
+ self.items.append(item)
+
+ else:
+ for index, p in enumerate(parents):
+ try:
+ i = index + 1
+ obj = self.get_block(parents[:i])[0]
+ ancestors.append(obj)
+
+ except ValueError:
+ # add parent to config
+ offset = index * self._indent
+ obj = ConfigLine(p)
+ obj.raw = p.rjust(len(p) + offset)
+ if ancestors:
+ obj._parents = list(ancestors)
+ ancestors[-1]._children.append(obj)
+ self.items.append(obj)
+ ancestors.append(obj)
+
+ # add child objects
+ for line in lines:
+ # handle ignore lines
+ if ignore_line(line):
+ continue
+
+ # check if child already exists
+ for child in ancestors[-1]._children:
+ if child.text == line:
+ break
+ else:
+ offset = len(parents) * self._indent
+ item = ConfigLine(line)
+ item.raw = line.rjust(len(line) + offset)
+ item._parents = ancestors
+ ancestors[-1]._children.append(item)
+ self.items.append(item)
+
+
+class CustomNetworkConfig(NetworkConfig):
+ def items_text(self):
+ return [item.text for item in self.items]
+
+ def expand_section(self, configobj, S=None):
+ if S is None:
+ S = list()
+ S.append(configobj)
+ for child in configobj.child_objs:
+ if child in S:
+ continue
+ self.expand_section(child, S)
+ return S
+
+ def to_block(self, section):
+ return "\n".join([item.raw for item in section])
+
+ def get_section(self, path):
+ try:
+ section = self.get_section_objects(path)
+ return self.to_block(section)
+ except ValueError:
+ return list()
+
+ def get_section_objects(self, path):
+ if not isinstance(path, list):
+ path = [path]
+ obj = self.get_object(path)
+ if not obj:
+ raise ValueError("path does not exist in config")
+ return self.expand_section(obj)