# # (c) 2017 Red Hat Inc. # (c) 2017 Kedar Kekan (kkekan@redhat.com) # # This file is part of Ansible # # Ansible is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Ansible is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Ansible. If not, see . # from __future__ import absolute_import, division, print_function __metaclass__ = type DOCUMENTATION = """ author: Ansible Networking Team (@ansible-network) name: iosxr short_description: Use iosxr netconf plugin to run netconf commands on Cisco IOSXR platform description: - This iosxr plugin provides low level abstraction apis for sending and receiving netconf commands from Cisco iosxr network devices. version_added: 1.0.0 options: ncclient_device_handler: type: str default: iosxr description: - Specifies the ncclient device handler name for Cisco iosxr network os. To identify the ncclient device handler name refer ncclient library documentation. """ import collections import json import re from ansible.errors import AnsibleConnectionFailure from ansible.module_utils._text import to_native, to_text from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.netconf import ( remove_namespaces, ) from ansible_collections.ansible.netcommon.plugins.plugin_utils.netconf_base import ( NetconfBase, ensure_ncclient, ) from ansible_collections.cisco.iosxr.plugins.module_utils.network.iosxr.iosxr import ( build_xml, etree_find, ) try: from ncclient import manager from ncclient.operations import RPCError from ncclient.transport.errors import SSHUnknownHostError from ncclient.xml_ import to_xml HAS_NCCLIENT = True except ( ImportError, AttributeError, ): # paramiko and gssapi are incompatible and raise AttributeError not ImportError HAS_NCCLIENT = False class Netconf(NetconfBase): def get_device_info(self): device_info = {} device_info["network_os"] = "iosxr" install_meta = collections.OrderedDict() install_meta.update( [ ("prepare", {"xpath": "install/prepare", "tag": True}), ( "prepared-boot-image", { "xpath": "install/prepare/prepared-boot-image", "tag": True, }, ), ("version", {"xpath": "install/version", "tag": True}), ("label", {"xpath": "install/version/label", "tag": True}), ( "hardware-info", {"xpath": "install/version/hardware-info", "tag": True}, ), ("package", {"xpath": "install/version/package", "tag": True}), ], ) install_filter = build_xml( "install", install_meta, opcode="filter", namespace="install", ) try: reply = self.get(install_filter) resp = remove_namespaces( re.sub(r'<\?xml version="1.0" encoding="UTF-8"\?>', "", reply), ) ele_package_name = etree_find(resp.strip(), "name") if ele_package_name is not None: device_info["network_os_package"] = ele_package_name.text ele_label = etree_find(resp.strip(), "label") if ele_label is not None: device_info["network_os_version"] = ele_label.text model_search_strs = [ r"^[Cc]isco (.+) \(\) processor", r"^[Cc]isco (.+) \(revision", r"^[Cc]isco (\S+ \S+).+bytes of .*memory", ] ele_hardware_info = etree_find(resp.strip(), "hardware-info") if ele_hardware_info is not None: for item in model_search_strs: match = re.search(item, ele_hardware_info.text, re.M) if match: device_info["network_os_model"] = match.group(1) break except Exception as exc: error_msg = to_text(exc, errors="surrogate_or_strict").strip() if "bad-namespace" in error_msg: device_info = self.get_device_info_old_version() else: self._connection.queue_message( "vvvv", "Fail to retrieve device info %s" % exc, ) try: hostname_filter = build_xml( "host-names", opcode="filter", namespace="host-names", ) reply = self.get(hostname_filter) resp = remove_namespaces( re.sub(r'<\?xml version="1.0" encoding="UTF-8"\?>', "", reply), ) hostname_ele = etree_find(resp.strip(), "host-name") device_info["network_os_hostname"] = ( hostname_ele.text if hostname_ele is not None else None ) except Exception as exc: self._connection.queue_message( "vvvv", "Fail to retrieve device info %s" % exc, ) return device_info def get_capabilities(self): result = dict() result["rpc"] = self.get_base_rpc() result["network_api"] = "netconf" result["device_info"] = self.get_device_info() result["server_capabilities"] = list(self.m.server_capabilities) result["client_capabilities"] = list(self.m.client_capabilities) result["session_id"] = self.m.session_id result["device_operations"] = self.get_device_operations( result["server_capabilities"], ) return json.dumps(result) @staticmethod @ensure_ncclient def guess_network_os(obj): """ Guess the remote network os name :param obj: Netconf connection class object :return: Network OS name """ try: m = manager.connect( host=obj._play_context.remote_addr, port=obj._play_context.port or 830, username=obj._play_context.remote_user, password=obj._play_context.password, key_filename=obj.key_filename, hostkey_verify=obj.get_option("host_key_checking"), look_for_keys=obj.get_option("look_for_keys"), allow_agent=obj._play_context.allow_agent, timeout=obj.get_option("persistent_connect_timeout"), # We need to pass in the path to the ssh_config file when guessing # the network_os so that a jumphost is correctly used if defined ssh_config=obj._ssh_config, ) except SSHUnknownHostError as exc: raise AnsibleConnectionFailure(to_native(exc)) guessed_os = None for c in m.server_capabilities: if re.search("IOS-XR", c): guessed_os = "iosxr" break m.close_session() return guessed_os # TODO: change .xml to .data_xml, when ncclient supports data_xml on all platforms def get(self, filter=None, remove_ns=False): if isinstance(filter, list): filter = tuple(filter) try: resp = self.m.get(filter=filter) if remove_ns: response = remove_namespaces(resp) else: response = resp.data_xml if hasattr(resp, "data_xml") else resp.xml return response except RPCError as exc: raise Exception(to_xml(exc.xml)) def get_config(self, source=None, filter=None, remove_ns=False): if isinstance(filter, list): filter = tuple(filter) try: resp = self.m.get_config(source=source, filter=filter) if remove_ns: response = remove_namespaces(resp) else: response = resp.data_xml if hasattr(resp, "data_xml") else resp.xml return response except RPCError as exc: raise Exception(to_xml(exc.xml)) def edit_config( self, config=None, format="xml", target="candidate", default_operation=None, test_option=None, error_option=None, remove_ns=False, ): if config is None: raise ValueError("config value must be provided") try: resp = self.m.edit_config( config, format=format, target=target, default_operation=default_operation, test_option=test_option, error_option=error_option, ) if remove_ns: response = remove_namespaces(resp) else: response = resp.data_xml if hasattr(resp, "data_xml") else resp.xml return response except RPCError as exc: raise Exception(to_xml(exc.xml)) def commit( self, confirmed=False, timeout=None, persist=None, remove_ns=False, ): timeout = to_text(timeout, errors="surrogate_or_strict") try: resp = self.m.commit( confirmed=confirmed, timeout=timeout, persist=persist, ) if remove_ns: response = remove_namespaces(resp) else: response = resp.data_xml if hasattr(resp, "data_xml") else resp.xml return response except RPCError as exc: raise Exception(to_xml(exc.xml)) def validate(self, source="candidate", remove_ns=False): try: resp = self.m.validate(source=source) if remove_ns: response = remove_namespaces(resp) else: response = resp.data_xml if hasattr(resp, "data_xml") else resp.xml return response except RPCError as exc: raise Exception(to_xml(exc.xml)) def discard_changes(self, remove_ns=False): try: resp = self.m.discard_changes() if remove_ns: response = remove_namespaces(resp) else: response = resp.data_xml if hasattr(resp, "data_xml") else resp.xml return response except RPCError as exc: raise Exception(to_xml(exc.xml)) def get_device_info_old_version(self): device_info = {} device_info["network_os"] = "iosxr" install_meta = collections.OrderedDict() install_meta.update( [ ( "boot-variables", {"xpath": "install/boot-variables", "tag": True}, ), ( "boot-variable", { "xpath": "install/boot-variables/boot-variable", "tag": True, "lead": True, }, ), ("software", {"xpath": "install/software", "tag": True}), ( "alias-devices", {"xpath": "install/software/alias-devices", "tag": True}, ), ( "alias-device", { "xpath": "install/software/alias-devices/alias-device", "tag": True, }, ), ( "m:device-name", { "xpath": "install/software/alias-devices/alias-device/device-name", "value": "disk0:", }, ), ], ) install_filter = build_xml( "install", install_meta, opcode="filter", namespace="install_old", ) try: reply = self.get(install_filter) resp = remove_namespaces( re.sub(r'<\?xml version="1.0" encoding="UTF-8"\?>', "", reply), ) ele_boot_variable = etree_find(resp, "boot-variable/boot-variable") if ele_boot_variable is not None: device_info["network_os_image"] = re.split( "[:|,]", ele_boot_variable.text, )[1] ele_package_name = etree_find(reply, "package-name") if ele_package_name is not None: device_info["network_os_package"] = ele_package_name.text device_info["network_os_version"] = re.split( "-", ele_package_name.text, )[-1] except Exception as exc: self._connection.queue_message( "vvvv", "Fail to retrieve device info %s" % exc, ) return device_info