diff options
-rw-r--r-- | Makefile | 4 | ||||
-rw-r--r-- | VERSION | 2 | ||||
-rw-r--r-- | cvprac/__init__.py | 2 | ||||
-rw-r--r-- | cvprac/cvp_api.py | 830 | ||||
-rw-r--r-- | cvprac/cvp_client.py | 129 | ||||
-rw-r--r-- | dev-requirements.txt | 1 | ||||
-rw-r--r-- | docs/labs/lab06-provisioning/vc_task_retrigger.py | 4 | ||||
-rw-r--r-- | docs/release-notes-1.4.0.rst | 20 | ||||
-rw-r--r-- | requirements.txt | 1 | ||||
-rw-r--r-- | setup.py | 11 |
10 files changed, 509 insertions, 495 deletions
@@ -49,7 +49,9 @@ coverage_report: pep8: -pep8 -r --max-line-length=120 --ignore=$(PEP8_IGNORE) cvprac/ - -pep8 -r --max-line-length=120 --ignore=$(PEP8_IGNORE),E402 test/ + -pep8 -r --max-line-length=120 --ignore=$(PEP8_IGNORE),E402 test/lib/ + -pep8 -r --max-line-length=120 --ignore=$(PEP8_IGNORE),E402 test/system/ + -pep8 -r --ignore=$(PEP8_IGNORE),E402,E501 test/unit/ pyflakes: pyflakes cvprac/ test/ @@ -1 +1 @@ -1.3.2 +1.4.0 diff --git a/cvprac/__init__.py b/cvprac/__init__.py index 8d62fa0..67d57ff 100644 --- a/cvprac/__init__.py +++ b/cvprac/__init__.py @@ -32,5 +32,5 @@ ''' RESTful API Client class for Cloudvision(R) Portal ''' -__version__ = '1.3.2' +__version__ = '1.4.0' __author__ = 'Arista Networks, Inc.' diff --git a/cvprac/cvp_api.py b/cvprac/cvp_api.py index c390989..0d73658 100644 --- a/cvprac/cvp_api.py +++ b/cvprac/cvp_api.py @@ -1,3 +1,4 @@ +# pylint: disable=fixme,too-many-locals,redefined-builtin # # Copyright (c) 2017, Arista Networks, Inc. # All rights reserved. @@ -35,7 +36,6 @@ import operator import os import time # This import is for proper file IO handling support for both Python 2 and 3 -# pylint: disable=redefined-builtin from io import open from datetime import datetime from re import split @@ -56,7 +56,53 @@ OPERATOR_DICT = { } -class CvpApi(object): +def sanitize_warnings(data): + ''' Sanitize the warnings returned after validation. + + In some cases where the configlets have both errors + and warnings, CVP may split warnings that have + `,` across multiple strings. + This method concatenates the strings back into one string + per warning, and corrects the warningCount. + + Args: + data (dict): A dict that contians the result + of the validation operation + Returns: + response (dict): A dict that contains the result of the + validation operation + ''' + if "warnings" not in data: + # nothing to do here, we can return as is + return data + # Since there may be warnings incorrectly split on + # ', ' within the warning text by CVP, we join all the + # warnings together using ', ' into one large string + temp_warnings = ", ".join(data['warnings']).strip() + + # To split the large string again we match on the + # 'at line XXX' that should indicate the end of the warning. + # We capture as well the remaining \\n or whitespace and include + # the extra ', ' added in the previous step in the matching criteria. + # The extra ', ' is not included in the strings of the new list + temp_warnings = split( + r'(.*?at line \d+.*?),\s+', + temp_warnings + ) + + # The behaviour of re.split will add empty strings + # if the regex matches on the begging or ending of the line. + # Refer to https://docs.python.org/3/library/re.html#re.split + + # Use filter to remove any empty strings + # that re.split inserted + data['warnings'] = list(filter(None, temp_warnings)) + # Update the count of warnings to the correct value + data['warningCount'] = len(data['warnings']) + return data + + +class CvpApi(): ''' CvpApi class contains calls to CVP RESTful API. The RESTful API parameters are passed in as parameters to the method. The results of the RESTful API call are converted from json to a dict and returned. @@ -110,8 +156,7 @@ class CvpApi(object): running CVP version to. ''' if opr not in OPERATOR_DICT: - self.log.error('%s is an invalid operation for version comparison' - % opr) + self.log.error("%s is an invalid operation for version comparison", opr) return False # Since CVaaS is automatically the latest version of the API, if @@ -165,9 +210,9 @@ class CvpApi(object): user_type (str): type of AAA (Local/TACACS/RADIUS) ''' if status not in ['Enabled', 'Disabled']: - self.log.error('Invalid status %s.' - ' Status must be Enabled or Disabled.' - ' Defaulting to Disabled' % status) + self.log.error(f"Invalid status {status}." + " Status must be Enabled or Disabled." + " Defaulting to Disabled") status = 'Disabled' data = {"roles": [role], "user": {"contactNumber": "", @@ -198,9 +243,9 @@ class CvpApi(object): user_type (str): type of AAA (Local/TACACS/RADIUS) ''' if status not in ['Enabled', 'Disabled']: - self.log.error('Invalid status %s.' - ' Status must be Enabled or Disabled.' - ' Defaulting to Disabled' % status) + self.log.error(f"Invalid status {status}." + f" Status must be Enabled or Disabled." + f" Defaulting to Disabled") status = 'Disabled' data = {"roles": [role], "user": {"contactNumber": "", @@ -211,7 +256,7 @@ class CvpApi(object): "userId": username, "userStatus": status, "userType": user_type}} - return self.clnt.post('/user/updateUser.do?userId={}'.format(username), + return self.clnt.post(f"/user/updateUser.do?userId={username}", data=data, timeout=self.request_timeout) def get_user(self, username): @@ -220,7 +265,7 @@ class CvpApi(object): Args: username (str): username on CVP ''' - return self.clnt.get('/user/getUser.do?userId={}'.format(username), + return self.clnt.get(f"/user/getUser.do?userId={qplus(username)}", timeout=self.request_timeout) def get_users(self, query='', start=0, end=0): @@ -248,10 +293,9 @@ class CvpApi(object): 'currentStatus': 'Online', 'addedByUser': 'cvp system'}]} ''' - self.log.debug('get_users: query: %s' % query) - return self.clnt.get('/user/getUsers.do?' - 'queryparam=%s&startIndex=%d&endIndex=%d' % - (qplus(query), start, end), + self.log.debug(f"get_users: query: {query}") + return self.clnt.get(f"/user/getUsers.do?" + f"queryparam={qplus(query)}&startIndex={start}&endIndex={end}", timeout=self.request_timeout) def delete_user(self, username): @@ -275,12 +319,12 @@ class CvpApi(object): task (dict): The CVP task for the associated Id. Returns None if the task_id was invalid. ''' - self.log.debug('get_task_by_id: task_id: %s' % task_id) + self.log.debug(f"get_task_by_id: task_id: {task_id}") try: - task = self.clnt.get('/task/getTaskById.do?taskId=%s' % task_id, + task = self.clnt.get(f"/task/getTaskById.do?taskId={task_id}", timeout=self.request_timeout) except CvpApiError as error: - self.log.debug('Caught error: %s attempting to get task.' % error) + self.log.debug(f"Caught error: {error} attempting to get task.") # Catch an invalid task_id error and return None return None return task @@ -297,10 +341,10 @@ class CvpApi(object): Returns: tasks (list): The list of tasks ''' - self.log.debug('get_tasks_by_status: status: %s' % status) + self.log.debug(f"get_tasks_by_status: status: {status}") data = self.clnt.get( - '/task/getTasks.do?queryparam=%s&startIndex=%d&endIndex=%d' % - (status, start, end), timeout=self.request_timeout) + f"/task/getTasks.do?queryparam={status}&startIndex={start}&endIndex={end}", + timeout=self.request_timeout) return data['data'] def get_tasks(self, start=0, end=0): @@ -316,9 +360,8 @@ class CvpApi(object): the 'data' key contains a list of the tasks. ''' self.log.debug('get_tasks:') - return self.clnt.get('/task/getTasks.do?queryparam=&startIndex=%d&' - 'endIndex=%d' % (start, end), - timeout=self.request_timeout) + return self.clnt.get(f"/task/getTasks.do?queryparam=&startIndex={start}&" + f"endIndex={end}", timeout=self.request_timeout) def get_logs_by_id(self, task_id, start=0, end=0): ''' Returns the log entries for the task with the specified TaskId. @@ -334,14 +377,13 @@ class CvpApi(object): task (dict): The CVP log for the associated Id. Returns None if the task_id was invalid. ''' - self.log.debug('get_logs_by_id: task_id: %s' % task_id) + self.log.debug(f"get_logs_by_id: task_id: {task_id}") if self.clnt.apiversion is None: self.get_cvp_info() if self.clnt.apiversion < 5.0: self.log.debug('v1 - v4 /task/getLogsByID.do?') - resp = self.clnt.get('/task/getLogsById.do?id=%s&queryparam=' - '&startIndex=%d&endIndex=%d' % - (task_id, start, end), + resp = self.clnt.get(f"/task/getLogsById.do?id={task_id}&queryparam=" + f"&startIndex={start}&endIndex={end}", timeout=self.request_timeout) else: self.log.debug('v5 /audit/getLogs.do') @@ -350,22 +392,21 @@ class CvpApi(object): if 'stageId' in task_info: stage_id = task_info['stageId'] else: - self.log.debug('No stage ID found for task %s' % task_id) + self.log.debug(f"No stage ID found for task {task_id}") if 'ccIdV2' in task_info: cc_id = task_info['ccIdV2'] if cc_id == '': - self.log.debug('No ccIdV2 for task %s.' - ' It was likely cancelled.' - ' Using old /task/getLogsByID.do?' - % task_id) + self.log.debug(f"No ccIdV2 for task {task_id}." + f" It was likely cancelled." + f" Using old /task/getLogsByID.do?") resp = self.clnt.get( - '/task/getLogsById.do?id=%s&queryparam=' - '&startIndex=%d&endIndex=%d' % (task_id, start, end), + f"/task/getLogsById.do?id={task_id}&queryparam=" + f"&startIndex={start}&endIndex={end}", timeout=self.request_timeout) else: resp = self.get_audit_logs_by_id(cc_id, stage_id) else: - self.log.debug('No change ID found for task %s' % task_id) + self.log.debug(f"No change ID found for task {task_id}") resp = None return resp @@ -398,8 +439,7 @@ class CvpApi(object): task_id (str): Task ID note (str): Note to add to the task ''' - self.log.debug('add_note_to_task: task_id: %s note: %s' % - (task_id, note)) + self.log.debug(f"add_note_to_task: task_id: {task_id} note: {note}") data = {'workOrderId': task_id, 'note': note} self.clnt.post('/task/addNoteToTask.do', data=data, timeout=self.request_timeout) @@ -416,10 +456,9 @@ class CvpApi(object): Args: task_id (str): Task ID ''' - self.log.debug('execute_task: task_id: %s' % task_id) + self.log.debug(f"execute_task: task_id: {task_id}") data = {'data': [task_id]} - self.clnt.post('/task/executeTask.do', data=data, - timeout=self.request_timeout) + self.clnt.post('/task/executeTask.do', data=data, timeout=self.request_timeout) def cancel_task(self, task_id): ''' Cancel the task @@ -427,7 +466,7 @@ class CvpApi(object): Args: task_id (str): Task ID ''' - self.log.debug('cancel_task: task_id: %s' % task_id) + self.log.debug(f"cancel_task: task_id: {task_id}") data = {'data': [task_id]} return self.clnt.post('/task/cancelTask.do', data=data, timeout=self.request_timeout) @@ -442,29 +481,28 @@ class CvpApi(object): ''' if self.clnt.apiversion is None: self.get_cvp_info() - configlets = self.clnt.get('/configlet/getConfiglets.do?' - 'startIndex=%d&endIndex=%d' % (start, end), + configlets = self.clnt.get(f"/configlet/getConfiglets.do?" + f"startIndex={start}&endIndex={end}", timeout=self.request_timeout) if self.clnt.apiversion == 1.0 or self.clnt.apiversion >= 4.0: self.log.debug('v1/v4+ Inventory API Call') return configlets - else: - self.log.debug('v2 Inventory API Call') - # New API getConfiglets does not return the actual configlet config - # Get the actual configlet config using getConfigletByName - if 'data' in configlets: - for configlet in configlets['data']: - full_cfglt_data = self.get_configlet_by_name( - configlet['name']) - configlet['config'] = full_cfglt_data['config'] - return configlets + self.log.debug('v2 Inventory API Call') + # New API getConfiglets does not return the actual configlet config + # Get the actual configlet config using getConfigletByName + if 'data' in configlets: + for configlet in configlets['data']: + full_cfglt_data = self.get_configlet_by_name(configlet['name']) + configlet['config'] = full_cfglt_data['config'] + return configlets def get_configlets_and_mappers(self): ''' Returns a list of all defined configlets and associated mappers ''' self.log.debug( 'get_configlets_and_mappers: getConfigletsAndAssociatedMappers') - return self.clnt.get('/configlet/getConfigletsAndAssociatedMappers.do') + return self.clnt.get('/configlet/getConfigletsAndAssociatedMappers.do', + timeout=self.request_timeout) def get_configlet_builder(self, c_id): ''' Returns the configlet builder data for the given configlet ID. @@ -472,8 +510,8 @@ class CvpApi(object): Args: c_id (str): The ID (key) for the configlet to be queried. ''' - return self.clnt.get('/configlet/getConfigletBuilder.do?id=%s' - % c_id, timeout=self.request_timeout) + return self.clnt.get(f"/configlet/getConfigletBuilder.do?id={c_id}", + timeout=self.request_timeout) def search_configlets(self, query, start=0, end=0): ''' Returns a list of configlets that match a search query. @@ -485,10 +523,9 @@ class CvpApi(object): end (int): End index for the pagination. If end index is 0 then all the records will be returned. Default is 0. ''' - self.log.debug('search_configlets: query: %s' % query) - return self.clnt.get('/configlet/searchConfiglets.do?' - 'queryparam=%s&startIndex=%d&endIndex=%d' % - (qplus(query), start, end), + self.log.debug(f"search_configlets: query: {query}") + return self.clnt.get(f"/configlet/searchConfiglets.do?" + f"queryparam={qplus(query)}&startIndex={start}&endIndex={end}", timeout=self.request_timeout) def get_configlet_by_name(self, name): @@ -500,9 +537,9 @@ class CvpApi(object): Returns: configlet (dict): The configlet dict. ''' - self.log.debug('get_configlets_by_name: name: %s' % name) - return self.clnt.get('/configlet/getConfigletByName.do?name=%s' - % qplus(name), timeout=self.request_timeout) + self.log.debug(f"get_configlets_by_name: name: {name}") + return self.clnt.get(f"/configlet/getConfigletByName.do?name={qplus(name)}", + timeout=self.request_timeout) def get_configlets_by_container_id(self, c_id, start=0, end=0): ''' Returns a list of configlets applied to the given container. @@ -513,9 +550,8 @@ class CvpApi(object): end (int): End index for the pagination. If end index is 0 then all the records will be returned. Default is 0. ''' - return self.clnt.get('/provisioning/getConfigletsByContainerId.do?' - 'containerId=%s&startIndex=%d&endIndex=%d' - % (c_id, start, end), + return self.clnt.get(f"/provisioning/getConfigletsByContainerId.do?" + f"containerId={c_id}&startIndex={start}&endIndex={end}", timeout=self.request_timeout) def get_configlets_by_netelement_id(self, d_id, start=0, end=0): @@ -527,9 +563,8 @@ class CvpApi(object): end (int): End index for the pagination. If end index is 0 then all the records will be returned. Default is 0. ''' - return self.clnt.get('/provisioning/getConfigletsByNetElementId.do?' - 'netElementId=%s&startIndex=%d&endIndex=%d' - % (d_id, start, end), + return self.clnt.get(f"/provisioning/getConfigletsByNetElementId.do?" + f"netElementId={d_id}&startIndex={start}&endIndex={end}", timeout=self.request_timeout) def get_image_bundle_by_container_id(self, container_id, start=0, end=0, @@ -542,15 +577,13 @@ class CvpApi(object): then all the records will be returned. Default is 0. scope (string) the session scope (true or false). ''' - if scope != 'true' and scope != 'false': - self.log.error('scope value must be true or false.' - ' %s is an invalid value.' - ' Defaulting back to false' % scope) + if scope not in ('true', 'false'): + self.log.error("scope value must be true or false. %s is an invalid value." + " Defaulting back to false", scope) scope = 'false' - return self.clnt.get('/provisioning/getImageBundleByContainerId.do?' - 'containerId=%s&startIndex=%d&endIndex=%d' - '&sessionScope=%s' - % (container_id, start, end, scope), + return self.clnt.get(f"/provisioning/getImageBundleByContainerId.do?" + f"containerId={container_id}&startIndex={start}&endIndex={end}" + f"&sessionScope={scope}", timeout=self.request_timeout) def get_configlet_history(self, key, start=0, end=0): @@ -567,10 +600,10 @@ class CvpApi(object): history (dict): The configlet dict with the changes from most recent to oldest. ''' - self.log.debug('get_configlets_history: key: %s' % key) - return self.clnt.get('/configlet/getConfigletHistory.do?configletId=' - '%s&queryparam=&startIndex=%d&endIndex=%d' % - (key, start, end), timeout=self.request_timeout) + self.log.debug(f"get_configlets_history: key: {key}") + return self.clnt.get(f"/configlet/getConfigletHistory.do?configletId=" + f"{key}&queryparam=&startIndex={start}&endIndex={end}", + timeout=self.request_timeout) def get_inventory(self, start=0, end=0, query='', provisioned=True): ''' Returns the a dict of the net elements known to CVP. @@ -589,13 +622,12 @@ class CvpApi(object): self.get_cvp_info() if self.clnt.apiversion == 1.0: self.log.debug('v1 Inventory API Call') - data = self.clnt.get('/inventory/getInventory.do?' - 'queryparam=%s&startIndex=%d&endIndex=%d' % - (qplus(query), start, end), + data = self.clnt.get(f"/inventory/getInventory.do?" + f"queryparam={qplus(query)}&startIndex={start}&endIndex={end}", timeout=self.request_timeout) return data['netElementList'] self.log.debug('v2 Inventory API Call') - data = self.clnt.get('/inventory/devices?provisioned=%s' % provisioned, + data = self.clnt.get(f"/inventory/devices?provisioned={provisioned}", timeout=self.request_timeout) containers = self.get_containers() for dev in data: @@ -703,8 +735,7 @@ class CvpApi(object): # If any devices did not appear, there is a problem # Join the missing IPs into a string for output missing_ips = ', '.join(device_ips) - raise RuntimeError('Devices {} failed to appear ' - 'in inventory'.format(missing_ips)) + raise RuntimeError(f"Devices {missing_ips} failed to appear in inventory") # Move the devices to their specified containers for device in device_list: @@ -733,12 +764,12 @@ class CvpApi(object): } self.add_devices_to_inventory([device], wait=wait) - def retry_add_to_inventory(self, device_mac, device_ip, username, + def retry_add_to_inventory(self, dev_mac, device_ip, username, password): '''Retry addition of device to Cvp inventory Args: - device_mac (str): MAC address of device + dev_mac (str): MAC address of device device_ip (str): ip address assigned to device username (str): username for device login password (str): password for user @@ -748,7 +779,7 @@ class CvpApi(object): self.get_cvp_info() if self.clnt.apiversion == 1.0: self.log.debug('v1 Inventory API Call') - data = {"key": device_mac, + data = {"key": dev_mac, "ipAddress": device_ip, "userName": username, "password": password} @@ -761,11 +792,11 @@ class CvpApi(object): self.log.warning( 'retry_add_to_inventory: not implemented for v2 APIs') - def delete_device(self, device_mac): + def delete_device(self, dev_mac): '''Delete the device and its pending tasks from Cvp inventory Args: - device_mac (str): mac address of device we are deleting + dev_mac (str): mac address of device we are deleting For CVP 2020 this param is now required to be the device serial number instead of MAC address. This method will handle getting @@ -775,13 +806,13 @@ class CvpApi(object): data (dict): Contains success or failure message ''' self.log.debug('delete_device: called') - return self.delete_devices([device_mac]) + return self.delete_devices([dev_mac]) - def delete_devices(self, device_macs): + def delete_devices(self, dev_macs): '''Delete the device and its pending tasks from Cvp inventory Args: - device_macs (list): list of mac address for + dev_macs (list): list of mac address for devices we're deleting For CVP 2020 this param is now required to be a list of device serial numbers instead @@ -796,7 +827,7 @@ class CvpApi(object): if self.clnt.apiversion is None: self.get_cvp_info() if self.clnt.apiversion < 4.0: - data = {'data': device_macs} + data = {'data': dev_macs} resp = self.clnt.post('/inventory/deleteDevices.do?', data=data, timeout=self.request_timeout) else: @@ -806,7 +837,7 @@ class CvpApi(object): ' when deleting a device. Looking up each devices' 'serial num based on provided MAC addresses') devices = [] - for dev_mac in device_macs: + for dev_mac in dev_macs: device_info = self.get_device_by_mac(dev_mac) if device_info is not None and 'serialNumber' in device_info: devices.append(device_info) @@ -902,7 +933,7 @@ class CvpApi(object): device (dict): The net element device dict for the device if otherwise returns an empty hash. ''' - self.log.debug('get_device_by_name: fqdn: %s' % fqdn) + self.log.debug(f"get_device_by_name: fqdn: {fqdn}") # data = self.get_inventory(start=0, end=0, query=fqdn) data = self.search_topology(fqdn) device = {} @@ -918,23 +949,23 @@ class CvpApi(object): break return device - def get_device_by_mac(self, device_mac): + def get_device_by_mac(self, dev_mac): ''' Returns the net element device dict for the devices mac address. Args: - device_mac (str): MAC Address of the device. + dev_mac (str): MAC Address of the device. Returns: device (dict): The net element device dict for the device if otherwise returns an empty hash. ''' - self.log.debug('get_device_by_mac: MAC address: %s' % device_mac) - # data = self.get_inventory(start=0, end=0, query=device_mac) - data = self.search_topology(device_mac) + self.log.debug(f"get_device_by_mac: MAC address: {dev_mac}") + # data = self.get_inventory(start=0, end=0, query=dev_mac) + data = self.search_topology(dev_mac) device = {} if 'netElementList' in data: for netelem in data['netElementList']: - if netelem['systemMacAddress'] == device_mac: + if netelem['systemMacAddress'] == dev_mac: device = netelem break return device @@ -949,8 +980,7 @@ class CvpApi(object): device (dict): The net element device dict for the device if otherwise returns an empty hash. ''' - self.log.debug('get_device_by_serial: Serial Number: %s' - % device_serial) + self.log.debug(f"get_device_by_serial: Serial Number: {device_serial}") data = self.search_topology(device_serial) device = {} if 'netElementList' in data: @@ -960,53 +990,51 @@ class CvpApi(object): break return device - def get_device_configuration(self, device_mac): + def get_device_configuration(self, dev_mac): ''' Returns the running configuration for the device provided. Args: - device_mac (str): Mac address of the device to get the running + dev_mac (str): Mac address of the device to get the running configuration for. Returns: device (dict): The net element device dict for the device if otherwise returns an empty hash. ''' - self.log.debug('get_device_configuration: device_mac: %s' % device_mac) + self.log.debug(f"get_device_configuration: dev_mac: {dev_mac}") if self.clnt.apiversion is None: self.get_cvp_info() if self.clnt.apiversion < 4.0: - data = self.clnt.get('/inventory/getInventoryConfiguration.do?' - 'netElementId=%s' % device_mac, + data = self.clnt.get(f"/inventory/getInventoryConfiguration.do?netElementId={dev_mac}", timeout=self.request_timeout) else: - data = self.clnt.get('/inventory/device/config?' - 'netElementId=%s' % device_mac, + data = self.clnt.get(f"/inventory/device/config?netElementId={dev_mac}", timeout=self.request_timeout) running_config = '' if 'output' in data: running_config = data['output'] return running_config - def get_device_image_info(self, device_mac): + def get_device_image_info(self, dev_mac): ''' Return a dict of info about a device in CVP. Args: - device_mac (str): Mac address of the device to get the running + dev_mac (str): Mac address of the device to get the running configuration for. Returns: device_image_info (dict): Dict of image info for the device if found. Otherwise returns None. ''' - self.log.debug('Attempt to get net element data for %s' % device_mac) + self.log.debug(f"Attempt to get net element data for {dev_mac}") try: device_image_info = self.clnt.get( - '/provisioning/getNetElementInfoById.do?netElementId=%s' - % qplus(device_mac), timeout=self.request_timeout) + f"/provisioning/getNetElementInfoById.do?netElementId={qplus(dev_mac)}", + timeout=self.request_timeout) except CvpApiError as error: # Catch error when device for provided MAC is not found if 'Invalid Netelement id' in str(error): - self.log.debug('Device with MAC %s not found' % device_mac) + self.log.debug(f"Device with MAC {dev_mac} not found") return None raise error return device_image_info @@ -1029,8 +1057,8 @@ class CvpApi(object): self.get_cvp_info() if self.clnt.apiversion == 1.0: self.log.debug('v1 Inventory API Call') - return self.clnt.get('/inventory/add/searchContainers.do?' - 'startIndex=%d&endIndex=%d' % (start, end)) + return self.clnt.get(f"/inventory/add/searchContainers.do?" + f"startIndex={start}&endIndex={end}") self.log.debug('v2 Inventory API Call') containers = self.clnt.get('/inventory/containers') for container in containers: @@ -1046,9 +1074,8 @@ class CvpApi(object): container['parentId'] = cont['Key'] break else: - self.log.debug( - 'No container parentId found for parentName %s', - full_cont_info['parentName']) + self.log.debug(f"No container parentId found for" + f" parentName {full_cont_info['parentName']}") container['parentId'] = None else: container['parentName'] = None @@ -1069,9 +1096,9 @@ class CvpApi(object): Returns: container (dict): Container info in dictionary format or None ''' - self.log.debug('Get info for container %s' % name) - conts = self.clnt.get('/provisioning/searchTopology.do?queryParam=%s' - '&startIndex=0&endIndex=0' % qplus(name)) + self.log.debug(f"Get info for container {name}") + conts = self.clnt.get(f"/provisioning/searchTopology.do?queryParam={qplus(name)}" + f"&startIndex=0&endIndex=0") if conts['total'] > 0 and conts['containerList']: for cont in conts['containerList']: if cont['name'] == name: @@ -1087,9 +1114,9 @@ class CvpApi(object): Returns: container (dict): Container info in dictionary format or None ''' - self.log.debug('Get info for container %s' % key) - return self.clnt.get('/provisioning/getContainerInfoById.do?' - 'containerId=%s' % qplus(key)) + self.log.debug(f"Get info for container {key}") + return self.clnt.get(f"/provisioning/getContainerInfoById.do?" + f"containerId={qplus(key)}") def get_configlets_by_device_id(self, mac, start=0, end=0): ''' Returns the list of configlets applied to a device. @@ -1104,7 +1131,7 @@ class CvpApi(object): Returns: configlets (list): The list of configlets applied to the device ''' - self.log.debug('get_configlets_by_device: mac: %s' % mac) + self.log.debug(f"get_configlets_by_device: mac: {mac}") data = self.get_configlets_by_netelement_id(mac, start, end) return data['configletList'] @@ -1142,19 +1169,17 @@ class CvpApi(object): if not form: form = [] - self.log.debug('add_configlet_builder: name: %s config: %s form: %s' - % (name, config, form)) + self.log.debug(f"add_configlet_builder: name: {name} config: {config} form: {form}") data = {'name': name, 'data': {'formList': form, 'main_script': {'data': config}}} # Create the configlet builder - self.clnt.post('/configlet/addConfigletBuilder.do?isDraft=%s' % draft, + self.clnt.post(f"/configlet/addConfigletBuilder.do?isDraft={draft}", data=data, timeout=self.request_timeout) # Get the key for the configlet - data = self.clnt.get( - '/configlet/getConfigletByName.do?name=%s' % qplus(name), - timeout=self.request_timeout) + data = self.clnt.get(f"/configlet/getConfigletByName.do?name={qplus(name)}", + timeout=self.request_timeout) return data['key'] def add_configlet(self, name, config): @@ -1167,18 +1192,17 @@ class CvpApi(object): Returns: key (str): The key for the configlet ''' - self.log.debug('add_configlet: name: %s config: %s' % (name, config)) + self.log.debug(f"add_configlet: name: {name} config: {config}") body = {'name': name, 'config': config} # Create the configlet self.clnt.post('/configlet/addConfiglet.do', data=body, timeout=self.request_timeout) # Get the key for the configlet - data = self.clnt.get('/configlet/getConfigletByName.do?name=%s' - % qplus(name), timeout=self.request_timeout) + data = self.clnt.get(f"/configlet/getConfigletByName.do?name={qplus(name)}", + timeout=self.request_timeout) return data['key'] - def delete_configlet(self, name, key): ''' Delete the configlet. @@ -1186,7 +1210,7 @@ class CvpApi(object): name (str): Configlet name key (str): Configlet key ''' - self.log.debug('delete_configlet: name: %s key: %s' % (name, key)) + self.log.debug(f"delete_configlet: name: {name} key: {key}") body = [{'name': name, 'key': key}] # Delete the configlet self.clnt.post('/configlet/deleteConfiglet.do', data=body, @@ -1204,8 +1228,7 @@ class CvpApi(object): Returns: data (dict): Contains success or failure message ''' - self.log.debug('update_configlet: config: %s key: %s name: %s' % - (config, key, name)) + self.log.debug(f"update_configlet: config: {config} key: {key} name: {name}") # Update the configlet body = {'config': config, 'key': key, 'name': name, @@ -1256,21 +1279,18 @@ class CvpApi(object): } } } - debug_str = 'update_configlet_builder:' \ - ' config: {} key: {} name: {} form: {}' - self.log.debug(debug_str.format(config, key, name, form)) + self.log.debug(f"update_configlet_builder: config: {config}" + f" key: {key} name: {name} form: {form}") # Update the configlet builder - url_string = '/configlet/updateConfigletBuilder.do?' \ - 'isDraft={}&id={}&action=save' - return self.clnt.post(url_string.format(draft, key), - data=data, timeout=self.request_timeout) + url_string = f"/configlet/updateConfigletBuilder.do?isDraft={draft}&id={key}&action=save" + return self.clnt.post(url_string, data=data, timeout=self.request_timeout) - def update_reconcile_configlet(self, device_mac, config, key, name, + def update_reconcile_configlet(self, dev_mac, config, key, name, reconciled=False): ''' Update the reconcile configlet. Args: - device_mac (str): Mac address/Key for device whose reconcile + dev_mac (str): Mac address/Key for device whose reconcile configlet is being updated config (str): Reconciled config statements key (str): Reconcile Configlet key @@ -1280,12 +1300,10 @@ class CvpApi(object): Returns: data (dict): Contains success or failure message ''' - log_str = ('update_reconcile_configlet:' - ' device_mac: {} config: {} key: {} name: {}') - self.log.debug(log_str.format(device_mac, config, key, name)) + self.log.debug(f"update_reconcile_configlet: dev_mac: {dev_mac}" + f" config: {config} key: {key} name: {name}") - url_str = ('/provisioning/updateReconcileConfiglet.do?' - 'netElementId={}') + url_str = f"/provisioning/updateReconcileConfiglet.do?netElementId={dev_mac}" body = { 'config': config, 'key': key, @@ -1293,8 +1311,7 @@ class CvpApi(object): 'reconciled': reconciled, 'unCheckedLines': '', } - return self.clnt.post(url_str.format(device_mac), data=body, - timeout=self.request_timeout) + return self.clnt.post(url_str, data=body, timeout=self.request_timeout) def add_note_to_configlet(self, key, note): ''' Add a note to a configlet. @@ -1310,66 +1327,20 @@ class CvpApi(object): return self.clnt.post('/configlet/addNoteToConfiglet.do', data=data, timeout=self.request_timeout) - def sanitize_warnings(self, data): - ''' Sanitize the warnings returned after validation. - - In some cases where the configlets has both errors - and warnings, CVP may split any warnings that have - `,` across multiple strings. - This method concats the strings back into one string - per warning, and correct the warningCount. - - Args: - data (dict): A dict that contians the result - of the validation operation - Returns: - response (dict): A dict that contains the result of the - validation operation - ''' - if "warnings" not in data: - # nothing to do here, we can return as is - return data - # Since there may be warnings incorrectly split on - # ', ' within the warning text by CVP, we join all the - # warnings together using ', ' into one large string - temp_warnings = ", ".join(data['warnings']).strip() - - # To split the large string again we match on the - # 'at line XXX' that should indicate the end of the warning. - # We capture as well the remaining \\n or whitespace and include - # the extra ', ' added in the previous step in the matching criteria. - # The extra ', ' is not included in the strings of the new list - temp_warnings = split( - r'(.*?at line \d+.*?),\s+', - temp_warnings - ) - - # The behaviour of re.split will add empty strings - # if the regex matches on the begging or ending of the line. - # Refer to https://docs.python.org/3/library/re.html#re.split - - # Use filter to remove any empty strings - # that re.split inserted - data['warnings'] = list(filter(None, temp_warnings)) - # Update the count of warnings to the correct value - data['warningCount'] = len(data['warnings']) - return data - - def validate_config_for_device(self, device_mac, config): + def validate_config_for_device(self, dev_mac, config): ''' Validate a config against a device Args: - device_mac (str): Device MAC address + dev_mac (str): Device MAC address config (str): Switch config statements Returns: response (dict): A dict that contains the result of the validation operation ''' - self.log.debug('validate_config_for_device: device_mac: %s config: %s' - % (device_mac, config)) - body = {'netElementId': device_mac, 'config': config} - return self.sanitize_warnings( + self.log.debug(f"validate_config_for_device: dev_mac: {dev_mac} config: {config}") + body = {'netElementId': dev_mac, 'config': config} + return sanitize_warnings( self.clnt.post( '/configlet/validateConfig.do', data=body, @@ -1377,40 +1348,35 @@ class CvpApi(object): ) ) - def validate_config(self, device_mac, config): + def validate_config(self, dev_mac, config): ''' Validate a config against a device and parse response to produce log messages are return a flag for the config validity. Args: - device_mac (str): Device MAC address + dev_mac (str): Device MAC address config (str): Switch config statements Returns: response (boolean): A flag signifying if the config is valid or not. ''' - self.log.debug('validate_config: device_mac: %s config: %s' - % (device_mac, config)) - result = self.validate_config_for_device(device_mac, config) + self.log.debug(f"validate_config: dev_mac: {dev_mac} config: {config}") + result = self.validate_config_for_device(dev_mac, config) validated = True if 'warningCount' in result and result['warnings']: for warning in result['warnings']: - self.log.warning('Validation of config produced warning - %s' - % warning) + self.log.warning(f"Validation of config produced warning - {warning}") if 'errorCount' in result: - self.log.error('Validation of config produced %s errors' - % result['errorCount']) + self.log.error(f"Validation of config produced {result['errorCount']} errors") if 'errors' in result: for error in result['errors']: - self.log.error('Validation of config produced error - %s' - % error) + self.log.error(f"Validation of config produced error - {error}") validated = False if 'result' in result: for item in result['result']: if 'messages' in item: for message in item['messages']: - self.log.info('Validation of config returned' - ' message - %s' % message) + self.log.info(f"Validation of config returned message - {message}") return validated def get_all_temp_actions(self, start=0, end=0): @@ -1425,8 +1391,7 @@ class CvpApi(object): response (dict): A dict that contains a list of the current temp actions. ''' - url = ('/provisioning/getAllTempActions.do?startIndex=%d&endIndex=%d' - % (start, end)) + url = (f"/provisioning/getAllTempActions.do?startIndex={start}&endIndex={end}") data = self.clnt.get(url, timeout=self.request_timeout) return data @@ -1462,7 +1427,7 @@ class CvpApi(object): url = '/provisioning/v2/saveTopology.do' return self.clnt.post(url, data=data, timeout=self.request_timeout) - def apply_configlets_to_device(self, app_name, dev, new_configlets, + def apply_configlets_to_device(self, app_name, dev, new_configlets, # pylint: disable=too-many-locals create_task=True, reorder_configlets=False, validate=False): ''' Apply the configlets to the device. @@ -1497,8 +1462,7 @@ class CvpApi(object): Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}} ''' - self.log.debug('apply_configlets_to_device: dev: %s names: %s' % - (dev, new_configlets)) + self.log.debug(f"apply_configlets_to_device: dev: {dev} names: {new_configlets}") # Get a list of the names and keys of the configlets cnames = [] ckeys = [] @@ -1516,7 +1480,7 @@ class CvpApi(object): cnames.append(entry['name']) ckeys.append(entry['key']) - info = '%s: Configlet Assign: to Device %s' % (app_name, dev['fqdn']) + info = f"{app_name}: Configlet Assign: to Device {dev['fqdn']}" info_preview = '<b>Configlet Assign:</b> to Device' + dev['fqdn'] data = {'data': [{'info': info, 'infoPreview': info_preview, @@ -1544,16 +1508,17 @@ class CvpApi(object): 'parentTask': ''}]} if validate: validation_result = self.validate_configlets_for_device(dev['systemMacAddress'], ckeys) - data['data'][0].update({ - "configCompareCount": { - "mismatch": validation_result['mismatch'], - "reconcile": validation_result['reconcile'], - "new": validation_result['new'] + data['data'][0].update( + { + "configCompareCount": + { + "mismatch": validation_result['mismatch'], + "reconcile": validation_result['reconcile'], + "new": validation_result['new'] } } ) - self.log.debug('apply_configlets_to_device: saveTopology data:\n%s' % - data['data']) + self.log.debug(f"apply_configlets_to_device: saveTopology data:\n{data['data']}") self._add_temp_action(data) if create_task: return self._save_topology_v2([]) @@ -1583,8 +1548,7 @@ class CvpApi(object): Ex: {u'data': {u'status': u'success', u'taskIds': [u'35']}} ''' - self.log.debug('remove_configlets_from_device: dev: %s names: %s' % - (dev, del_configlets)) + self.log.debug(f"remove_configlets_from_device: dev: {dev} names: {del_configlets}") # Get all the configlets assigned to the device. configlets = self.get_configlets_by_device_id(dev['systemMacAddress']) @@ -1608,7 +1572,7 @@ class CvpApi(object): del_names.append(entry['name']) del_keys.append(entry['key']) - info = '%s Configlet Remove: from Device %s' % (app_name, dev['fqdn']) + info = f"{app_name} Configlet Remove: from Device {dev['fqdn']}" info_preview = '<b>Configlet Remove:</b> from Device' + dev['fqdn'] data = {'data': [{'info': info, 'infoPreview': info_preview, @@ -1635,17 +1599,18 @@ class CvpApi(object): 'childTasks': [], 'parentTask': ''}]} if validate: - validation_result = self.validate_configlets_for_device(dev['systemMacAddress'], keep_keys) - data['data'][0].update({ - "configCompareCount": { - "mismatch": validation_result['mismatch'], - "reconcile": validation_result['reconcile'], - "new": validation_result['new'] + validation_result = self.validate_configlets_for_device(dev['systemMacAddress'], + keep_keys) + data['data'][0].update( + { + "configCompareCount": { + "mismatch": validation_result['mismatch'], + "reconcile": validation_result['reconcile'], + "new": validation_result['new'] } } ) - self.log.debug('remove_configlets_from_device: saveTopology data:\n%s' - % data['data']) + self.log.debug(f"remove_configlets_from_device: saveTopology data:\n{data['data']}") self._add_temp_action(data) if create_task: return self._save_topology_v2([]) @@ -1668,9 +1633,8 @@ class CvpApi(object): Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}} ''' - self.log.debug( - 'apply_configlets_to_container: container: %s names: %s' % - (container, new_configlets)) + self.log.debug(f"apply_configlets_to_container: container: {container}" + f" names: {new_configlets}") # Get all the configlets assigned to the device. configlets = self.get_configlets_by_container_id(container['key']) @@ -1695,8 +1659,7 @@ class CvpApi(object): cnames.append(entry['name']) ckeys.append(entry['key']) - info = '%s: Configlet Assign: to Container %s' % (app_name, - container['name']) + info = f"{app_name}: Configlet Assign: to Container {container['name']}" info_preview = '<b>Configlet Assign:</b> to Container' + container[ 'name'] data = {'data': [{'info': info, @@ -1723,9 +1686,7 @@ class CvpApi(object): 'nodeTargetIpAddress': '', 'childTasks': [], 'parentTask': ''}]} - self.log.debug( - 'apply_configlets_to_container: saveTopology data:\n%s' % - data['data']) + self.log.debug(f"apply_configlets_to_container: saveTopology data:\n{data['data']}") self._add_temp_action(data) if create_task: return self._save_topology_v2([]) @@ -1750,9 +1711,8 @@ class CvpApi(object): Ex: {u'data': {u'status': u'success', u'taskIds': [u'35']}} ''' - self.log.debug( - 'remove_configlets_from_container: container: %s names: %s' % - (container, del_configlets)) + self.log.debug(f"remove_configlets_from_container: container: {container}" + f" names: {del_configlets}") # Get all the configlets assigned to the device. configlets = self.get_configlets_by_container_id(container['key']) @@ -1776,8 +1736,7 @@ class CvpApi(object): del_names.append(entry['name']) del_keys.append(entry['key']) - info = '%s Configlet Remove: from Container %s' % (app_name, - container['name']) + info = f"{app_name} Configlet Remove: from Container {container['name']}" info_preview = '<b>Configlet Remove:</b> from Container' + container[ 'name'] data = {'data': [{'info': info, @@ -1804,9 +1763,7 @@ class CvpApi(object): 'nodeTargetIpAddress': '', 'childTasks': [], 'parentTask': ''}]} - self.log.debug( - 'remove_configlets_from_container: saveTopology data:\n%s' - % data['data']) + self.log.debug(f"remove_configlets_from_container: saveTopology data:\n{data['data']}") self._add_temp_action(data) if create_task: return self._save_topology_v2([]) @@ -1839,9 +1796,8 @@ class CvpApi(object): "configletId": "string"}, ...] } ''' - self.log.debug('validate_configlets_for_device: ' - 'MAC: %s - conf keys: %s - page_type: %s' % - (mac, configlet_keys, page_type)) + self.log.debug(f"validate_configlets_for_device: " + f"MAC: {mac} - conf keys: {configlet_keys} - page_type: {page_type}") data = {'configIdList': configlet_keys, 'netElementId': mac, 'pageType': page_type} @@ -1858,9 +1814,8 @@ class CvpApi(object): end (int): End index for the pagination. If end index is 0 then all the records will be returned. Default is 0. ''' - return self.clnt.get('/configlet/getAppliedDevices.do?' - 'configletName=%s&startIndex=%d&endIndex=%d' - % (configlet_name, start, end), + return self.clnt.get(f"/configlet/getAppliedDevices.do?" + f"configletName={configlet_name}&startIndex={start}&endIndex={end}", timeout=self.request_timeout) def get_applied_containers(self, configlet_name, start=0, end=0): @@ -1873,9 +1828,8 @@ class CvpApi(object): end (int): End index for the pagination. If end index is 0 then all the records will be returned. Default is 0. ''' - return self.clnt.get('/configlet/getAppliedContainers.do?' - 'configletName=%s&startIndex=%d&endIndex=%d' - % (configlet_name, start, end), + return self.clnt.get(f"/configlet/getAppliedContainers.do?" + f"configletName={configlet_name}&startIndex={start}&endIndex={end}", timeout=self.request_timeout) # pylint: disable=too-many-arguments @@ -1896,8 +1850,7 @@ class CvpApi(object): Ex: {u'data': {u'status': u'success', u'taskIds': []}} ''' - msg = ('%s container %s under container %s' % - (operation, container_name, parent_name)) + msg = (f"{operation} container {container_name} under container {parent_name}") data = {'data': [{'info': msg, 'infoPreview': msg, 'action': operation, @@ -1936,8 +1889,8 @@ class CvpApi(object): Ex: {u'data': {u'status': u'success', u'taskIds': []}} ''' - self.log.debug('add_container: container: %s parent: %s parent_key: %s' - % (container_name, parent_name, parent_key)) + self.log.debug(f"add_container: container: {container_name}" + f" parent: {parent_name} parent_key: {parent_key}") return self._container_op(container_name, 'new_container', parent_name, parent_key, 'add') @@ -1957,10 +1910,9 @@ class CvpApi(object): Ex: {u'data': {u'status': u'success', u'taskIds': []}} ''' - self.log.debug('delete_container: container: %s container_key: %s ' - 'parent: %s parent_key: %s' % - (container_name, container_key, parent_name, - parent_key)) + self.log.debug(f"delete_container: container: {container_name}" + f" container_key: {container_key} parent: {parent_name}" + f" parent_key: {parent_key}") resp = self._container_op(container_name, container_key, parent_name, parent_key, 'delete') # As of CVP version 2020.1 the addTempAction.do API endpoint stopped @@ -1973,26 +1925,24 @@ class CvpApi(object): except CvpApiError as error: if 'Invalid Container id' in error.msg: return resp - else: - raise + raise if still_exists is not None: raise CvpApiError('Container was not deleted. Check for children') return resp - def get_parent_container_for_device(self, device_mac): + def get_parent_container_for_device(self, dev_mac): ''' Add the container to the specified parent. Args: - device_mac (str): Device mac address + dev_mac (str): Device mac address Returns: response (dict): A dict that contains the parent container info ''' - self.log.debug('get_parent_container_for_device: called for %s' - % device_mac) - data = self.clnt.get('/provisioning/searchTopology.do?' - 'queryParam=%s&startIndex=0&endIndex=0' - % device_mac, timeout=self.request_timeout) + self.log.debug(f"get_parent_container_for_device: called for {dev_mac}") + data = self.clnt.get(f"/provisioning/searchTopology.do?" + f"queryParam={dev_mac}&startIndex=0&endIndex=0", + timeout=self.request_timeout) if data['total'] > 0: cont_name = data['netElementContainerList'][0]['containerName'] return self.get_container_by_name(cont_name) @@ -2015,11 +1965,9 @@ class CvpApi(object): Ex: {u'data': {u'status': u'success', u'taskIds': []}} ''' - info = 'Device Add {} to container {} by {}'.format(device['fqdn'], - container['name'], - app_name) - self.log.debug('Attempting to move device %s to container %s' - % (device['fqdn'], container['name'])) + info = f"Device Add {device['fqdn']} to container {container['name']} by {app_name}" + self.log.debug(f"Attempting to move device {device['fqdn']} to" + f" container {container['name']}") if 'parentContainerId' in device: from_id = device['parentContainerId'] else: @@ -2042,8 +1990,7 @@ class CvpApi(object): # pylint: disable=invalid-name except CvpApiError as e: if 'Data already exists' in str(e): - self.log.debug('Device %s already in container %s' - % (device['fqdn'], container)) + self.log.debug(f"Device {device['fqdn']} already in container {container}") if create_task: return self._save_topology_v2([]) return None @@ -2062,20 +2009,35 @@ class CvpApi(object): response (dict): A dict that contains the container and netelement lists. ''' - self.log.debug('search_topology: query: %s start: %d end: %d' % - (query, start, end)) - data = self.clnt.get('/provisioning/searchTopology.do?queryParam=%s&' - 'startIndex=%d&endIndex=%d' - % (qplus(query), start, end), - timeout=self.request_timeout) + self.log.debug(f"search_topology: query: {query} start: {start} end: {end}") + if self.clnt.apiversion is None: + self.get_cvp_info() + if self.clnt.apiversion <= 6.0: + # Original search topology endpoint + req_url = (f"/provisioning/searchTopology.do?queryParam={qplus(query)}&" + f"startIndex={start}&endIndex={end}") + else: + # Newer CVP versions should use the V3 version of search topology endpoint + req_url = (f"/provisioning/v3/searchTopology.do?queryParam={qplus(query)}&" + f"startIndex={start}&endIndex={end}") + + data = self.clnt.get(req_url, timeout=self.request_timeout) if 'netElementList' in data: for device in data['netElementList']: device['status'] = device['deviceStatus'] - device['mlagEnabled'] = device['isMLAGEnabled'] - device['danzEnabled'] = device['isDANZEnabled'] device['parentContainerKey'] = device['parentContainerId'] - device['bootupTimestamp'] = device['bootupTimeStamp'] - device['internalBuild'] = device['internalBuildId'] + if 'isMLAGEnabled' in device: + # original key was mlagEnabled but it changed to isMLAGEnabled + device['mlagEnabled'] = device['isMLAGEnabled'] + elif 'mlagEnabled' in device: + # Key for V3 search topology changes back to mlagEnabled again + device['isMLAGEnabled'] = device['mlagEnabled'] + # Key isDANZEnabled for V3 search topology is no longer in return data. + device['danzEnabled'] = device.get('isDANZEnabled', '') + # Key bootupTimeStamp for V3 search topology is no longer in return data. + device['bootupTimestamp'] = device.get('bootupTimeStamp', '') + # Key internalBuildId for V3 search topology is no longer in return data. + device['internalBuild'] = device.get('internalBuildId', '') return data def filter_topology(self, node_id='root', fmt='topology', @@ -2091,9 +2053,8 @@ class CvpApi(object): end (int): End index for the pagination. If end index is 0 then all the records will be returned. Default is 0. ''' - url = ('/provisioning/filterTopology.do?nodeId=%s&' - 'format=%s&startIndex=%d&endIndex=%d' - % (node_id, fmt, start, end)) + url = (f"/provisioning/filterTopology.do?nodeId={node_id}&" + f"format={fmt}&startIndex={start}&endIndex={end}") return self.clnt.get(url, timeout=self.request_timeout) def check_compliance(self, node_key, node_type): @@ -2110,15 +2071,14 @@ class CvpApi(object): response (dict): A dict that contains the results of the compliance check. ''' - self.log.debug('check_compliance: node_key: %s node_type: %s' % - (node_key, node_type)) + self.log.debug(f"check_compliance: node_key: {node_key} node_type: {node_type}") data = {'nodeId': node_key, 'nodeType': node_type} resp = self.clnt.post('/provisioning/checkCompliance.do', data=data, timeout=self.request_timeout) if self.clnt.apiversion is None: self.get_cvp_info() if self.clnt.apiversion >= 2.0: - if resp['complianceIndication'] == u'': + if resp['complianceIndication'] == '': resp['complianceIndication'] = 'NONE' return resp @@ -2128,7 +2088,7 @@ class CvpApi(object): Args: e_id (str): The event id to be queried. ''' - return self.clnt.get('/event/getEventById.do?eventId=%s' % e_id, + return self.clnt.get(f"/event/getEventById.do?eventId={e_id}", timeout=self.request_timeout) def get_default_snapshot_template(self): @@ -2183,9 +2143,9 @@ class CvpApi(object): ''' # Get the absolute file path to be uploaded image_path = os.path.abspath(filepath) - image_data = open(image_path, 'rb') - response = self.clnt.post('/image/addImage.do', - files={'file': image_data}) + with open(image_path, 'rb') as image_data: + response = self.clnt.post('/image/addImage.do', + files={'file': image_data}) return response def cancel_image(self, image_name): @@ -2214,9 +2174,8 @@ class CvpApi(object): the 'data' key contains a list of images and their info. ''' self.log.debug('Get info about images') - return self.clnt.get('/image/getImages.do?queryparam=&startIndex=%d&' - 'endIndex=%d' % (start, end), - timeout=self.request_timeout) + return self.clnt.get(f"/image/getImages.do?queryparam=&startIndex={start}&" + f"endIndex={end}", timeout=self.request_timeout) def get_image_bundles(self, start=0, end=0): ''' Return a list of all image bundles. @@ -2233,8 +2192,8 @@ class CvpApi(object): ''' self.log.debug('Get image bundles that can be applied to devices or' ' containers') - return self.clnt.get('/image/getImageBundles.do?queryparam=&' - 'startIndex=%d&endIndex=%d' % (start, end), + return self.clnt.get(f"/image/getImageBundles.do?queryparam=&" + f"startIndex={start}&endIndex={end}", timeout=self.request_timeout) def get_image_bundle_by_name(self, name): @@ -2247,14 +2206,14 @@ class CvpApi(object): image bundle (dict): Dict of info specific to the image bundle requested or None if the name requested doesn't exist. ''' - self.log.debug('Attempt to get image bundle %s' % name) + self.log.debug(f"Attempt to get image bundle {name}") try: - image = self.clnt.get('/image/getImageBundleByName.do?name=%s' - % qplus(name), timeout=self.request_timeout) + image = self.clnt.get(f"/image/getImageBundleByName.do?name={qplus(name)}", + timeout=self.request_timeout) except CvpApiError as error: # Catch an invalid task_id error and return None if 'Entity does not exist' in str(error): - self.log.debug('Bundle with name %s does not exist' % name) + self.log.debug(f"Bundle with name {name} does not exist") return None raise error return image @@ -2362,24 +2321,22 @@ class CvpApi(object): Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}} ''' - self.log.debug('Attempt to apply %s to %s %s' % (image['name'], - id_type, name)) - info = 'Apply image: %s to %s %s' % (image['name'], id_type, name) + self.log.debug(f"Attempt to apply {image['name']} to {id_type} {name}") + info = f"Apply image: {image['name']} to {id_type} {name}" node_id = '' if 'imageBundleKeys' in image: if image['imageBundleKeys']: node_id = image['imageBundleKeys'][0] - self.log.info('Provided image is an image object.' - ' Using first value from imageBundleKeys - %s' - % node_id) + self.log.info(f"Provided image is an image object." + f" Using first value from imageBundleKeys - {node_id}") if 'id' in image: node_id = image['id'] - self.log.info('Provided image is an image bundle object.' - ' Found v1 API id field - %s' % node_id) + self.log.info(f"Provided image is an image bundle object." + f" Found v1 API id field - {node_id}") elif 'key' in image: node_id = image['key'] - self.log.info('Provided image is an image bundle object.' - ' Found v2 API key field - %s' % node_id) + self.log.info(f"Provided image is an image bundle object." + f" Found v2 API key field - {node_id}") data = {'data': [{'info': info, 'infoPreview': info, 'note': '', @@ -2446,23 +2403,22 @@ class CvpApi(object): Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}} ''' - self.log.debug('Attempt to remove %s from %s' % (image['name'], name)) - info = 'Remove image: %s from %s' % (image['name'], name) + self.log.debug(f"Attempt to remove {image['name']} from {name}") + info = f"Remove image: {image['name']} from {name}" node_id = '' if 'imageBundleKeys' in image: if image['imageBundleKeys']: node_id = image['imageBundleKeys'][0] - self.log.info('Provided image is an image object.' - ' Using first value from imageBundleKeys - %s' - % node_id) + self.log.info(f"Provided image is an image object." + f" Using first value from imageBundleKeys - {node_id}") if 'id' in image: node_id = image['id'] - self.log.info('Provided image is an image bundle object.' - ' Found v1 API id field - %s' % node_id) + self.log.info(f"Provided image is an image bundle object." + f" Found v1 API id field - {node_id}") elif 'key' in image: node_id = image['key'] - self.log.info('Provided image is an image bundle object.' - ' Found v2 API key field - %s' % node_id) + self.log.info(f"Provided image is an image bundle object." + f" Found v2 API key field - {node_id}") data = {'data': [{'info': info, 'infoPreview': info, 'note': '', @@ -2494,7 +2450,7 @@ class CvpApi(object): Returns: change controls (list): The list of change controls ''' - self.log.debug('get_change_controls: query: %s' % query) + self.log.debug(f"get_change_controls: query: {query}") if self.clnt.apiversion is None: self.get_cvp_info() if self.clnt.apiversion >= 3.0: @@ -2505,8 +2461,8 @@ class CvpApi(object): self.log.debug('v2 getChangeControls API Call') data = self.clnt.get( - '/changeControl/getChangeControls.do?searchText=%s' - '&startIndex=%d&endIndex=%d' % (qplus(query), start, end), + f"/changeControl/getChangeControls.do?searchText={qplus(query)}" + f"&startIndex={start}&endIndex={end}", timeout=self.request_timeout) if 'data' not in data: return None @@ -2524,7 +2480,7 @@ class CvpApi(object): Returns: tasks (list): The list of available tasks ''' - self.log.debug('change_control_available_tasks: query: %s' % query) + self.log.debug(f"change_control_available_tasks: query: {query}") if self.clnt.apiversion is None: self.get_cvp_info() if self.clnt.apiversion >= 3.0: @@ -2534,8 +2490,8 @@ class CvpApi(object): self.log.debug('v2 getTasksByStatus API Call') data = self.clnt.get( - '/changeControl/getTasksByStatus.do?searchText=%s' - '&startIndex=%d&endIndex=%d' % (qplus(query), start, end), + f"/changeControl/getTasksByStatus.do?searchText={qplus(query)}" + f"&startIndex={start}&endIndex={end}", timeout=self.request_timeout) if 'data' not in data: return None @@ -2644,9 +2600,8 @@ class CvpApi(object): if self.clnt.apiversion is None: self.get_cvp_info() if self.clnt.apiversion < 3.0: - self.log.debug('Wrong method for API version %s.' - ' Use create_change_control method', - self.clnt.apiversion) + self.log.debug(f"Wrong method for API version {self.clnt.apiversion}." + f" Use create_change_control method") self.log.warning('create_change_control_v3:' ' Use old change control APIs for old versions') return None @@ -2655,7 +2610,7 @@ class CvpApi(object): stages = [] if sequential: for index, task in enumerate(tasks): - stage_id = 'stage%d' % index + stage_id = f"stage{index}" stage = {'stage': [{ 'id': stage_id, 'action': { @@ -2669,7 +2624,7 @@ class CvpApi(object): else: stage_rows = [] for index, task in enumerate(tasks): - stage_id = 'stage%d' % index + stage_id = f"stage{index}" stage_row = { 'id': stage_id, 'action': { @@ -2704,8 +2659,7 @@ class CvpApi(object): Ex: {"data": "success"} ''' - self.log.debug('add_notes_to_change_control: cc_id %s, notes %s' - % (cc_id, notes)) + self.log.debug(f"add_notes_to_change_control: cc_id {cc_id}, notes {notes}") if self.clnt.apiversion is None: self.get_cvp_info() if self.clnt.apiversion >= 3.0: @@ -2760,9 +2714,8 @@ class CvpApi(object): if self.clnt.apiversion is None: self.get_cvp_info() if self.clnt.apiversion < 3.0: - self.log.debug('Approval methods not valid for API version %s.' - ' Functionality did not exist', - self.clnt.apiversion) + self.log.debug(f"Approval methods not valid for API version {self.clnt.apiversion}." + f" Functionality did not exist") return None self.log.debug('v3 Approve change control API Call') @@ -2781,9 +2734,8 @@ class CvpApi(object): if self.clnt.apiversion is None: self.get_cvp_info() if self.clnt.apiversion < 3.0: - self.log.debug('Approval methods not valid for API version %s.' - ' Functionality did not exist', - self.clnt.apiversion) + self.log.debug(f"Approval methods not valid for API version {self.clnt.apiversion}." + f" Functionality did not exist") return None self.log.debug('v3 Delete Approval for change control API Call') @@ -2896,7 +2848,7 @@ class CvpApi(object): 'timeZone': '', 'type': 'Custom'} ''' - self.log.debug('get_change_control_info: %s', cc_id) + self.log.debug(f"get_change_control_info: {cc_id}") if self.clnt.apiversion is None: self.get_cvp_info() if self.clnt.apiversion >= 3.0: @@ -2910,8 +2862,8 @@ class CvpApi(object): self.log.debug('v2 getChangeControlInformation.do API Call') try: resp = self.clnt.get( - '/changeControl/getChangeControlInformation.do?' - 'startIndex=0&endIndex=0&ccId=%s' % cc_id, + f"/changeControl/getChangeControlInformation.do?" + f"startIndex=0&endIndex=0&ccId={cc_id}", timeout=self.request_timeout) except CvpApiError as error: if 'No data found' in error.msg: @@ -2940,14 +2892,13 @@ class CvpApi(object): }, u'state': u'Completed'}}] ''' - self.log.debug('get_change_control_status: %s', cc_id) + self.log.debug(f"get_change_control_status: {cc_id}") if self.clnt.apiversion is None: self.get_cvp_info() if self.clnt.apiversion < 3.0: - self.log.debug('get_change_control_status method not supported' - ' for API version %s. Use old' - ' get_change_control_info method' - % self.clnt.apiversion) + self.log.debug(f"get_change_control_status method not supported" + f" for API version {self.clnt.apiversion}. Use old" + f" get_change_control_info method") return None self.log.debug( @@ -2972,8 +2923,7 @@ class CvpApi(object): Ex: {u'data': {u'status': u'success', u'taskIds': []}} ''' - info = ('App %s resetting device %s and moving it to Undefined' - % (app_name, device['fqdn'])) + info = (f"App {app_name} resetting device {device['fqdn']} and moving it to Undefined") self.log.debug(info) if 'parentContainerId' in device: @@ -3001,8 +2951,7 @@ class CvpApi(object): self._add_temp_action(data) except CvpApiError as error: if 'Data already exists' in str(error): - self.log.debug('Device %s already in container Undefined' - % device['fqdn']) + self.log.debug(f"Device {device['fqdn']} already in container Undefined") if create_task: return self._save_topology_v2([]) return None @@ -3028,7 +2977,7 @@ class CvpApi(object): Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}} ''' - info = 'Deploy device %s to container %s' % (device['fqdn'], container) + info = f"Deploy device {device['fqdn']} to container {container}" self.log.debug(info) container_info = self.get_container_by_name(container) # Add action for moving device to specified container @@ -3037,8 +2986,8 @@ class CvpApi(object): # Get proposed configlets device will inherit from container it is # being moved to. - prop_conf = self.clnt.get('/provisioning/getTempConfigsByNetElementId.' - 'do?netElementId=%s' % device['key']) + prop_conf = self.clnt.get(f"/provisioning/getTempConfigsByNetElementId." + f"do?netElementId={device['key']}") new_configlets = prop_conf['proposedConfiglets'] if configlets: new_configlets.extend(configlets) @@ -3061,30 +3010,33 @@ class CvpApi(object): devices (list): list of device Serial Numbers for which the token should be generated. The default is all devices. duration (string): the token's validity time (max 1 month), - accepted formats are: "24h", "86400s", "60m" + accepted formats for the legacy endpoint: "24h", "86400s", "60m" + accepted format for the new endpoint: "86400s" (only seconds) Returns: response (list) on CVaaS: A list that contains the generated enrollment token. Ex: [{'enrollmentToken':{'token': <token>, 'groups': [], 'reenrollDevices': <devices list>, - 'validFor': <duration e.g 24h>, 'field_mask': None}}] + 'validFor': <duration e.g 86400s>, 'field_mask': None}}] response (dict) on CV on-prem: A dictionary that contains the generated enrollment token. Ex: {'data': <token>} ''' + endpoint_legacy = '/cvpservice/enroll/createToken' + endpoint = '/api/resources/admin.Enrollment/AddEnrollmentToken' if not devices: devices = ["*"] # For on-prem check the version as it is only supported from 2021.2.0+ if not self.clnt.is_cvaas: if self.clnt.apiversion is None: self.get_cvp_info() + # TODO: update this check when 2024.2.0 is released if self.clnt.apiversion >= 6.0: self.log.debug('v6 /cvpservice/enroll/createToken') data = {"reenrollDevices": devices, "duration": duration} - return self.clnt.post('/cvpservice/enroll/createToken', - data=data, timeout=self.request_timeout) + return self.clnt.post(endpoint_legacy, data=data, timeout=self.request_timeout) self.log.warning( 'Enrollment Tokens only supported on CVP 2021.2.0+') return None @@ -3092,9 +3044,7 @@ class CvpApi(object): "enrollmentToken": {"reenrollDevices": devices, "validFor": duration} } - return self.clnt.post( - '/api/v3/services/admin.Enrollment/AddEnrollmentToken', - data=data, timeout=self.request_timeout) + return self.clnt.post(endpoint, data=data, timeout=self.request_timeout) def get_all_tags(self, element_type='ELEMENT_TYPE_UNSPECIFIED', workspace_id=''): ''' Get all device and/or interface tags from the mainline workspace or all other workspaces @@ -3121,8 +3071,9 @@ class CvpApi(object): } ] } - self.log.debug('v6 {}'.format(tag_url)) + self.log.debug(f"v6 {tag_url}") return self.clnt.post(tag_url, data=payload) + return None def get_tag_edits(self, workspace_id): ''' Show all tags edits in a workspace @@ -3151,6 +3102,7 @@ class CvpApi(object): } self.log.debug('v6 ' + tag_url + ' ' + str(payload)) return self.clnt.post(tag_url, data=payload) + return None def get_tag_assignment_edits(self, workspace_id): ''' Show all tags assignment edits in a workspace @@ -3179,6 +3131,7 @@ class CvpApi(object): } self.log.debug('v6 ' + tag_url + ' ' + str(payload)) return self.clnt.post(tag_url, data=payload) + return None def tag_config(self, element_type, workspace_id, tag_label, tag_value, remove=False): ''' Create/Delete device or interface tags. @@ -3213,8 +3166,9 @@ class CvpApi(object): }, "remove": remove } - self.log.debug('v6 {} '.format(tag_url) + str(payload)) + self.log.debug(f"v6 {tag_url} " + str(payload)) return self.clnt.post(tag_url, data=payload) + return None def tag_assignment_config(self, element_type, workspace_id, tag_label, tag_value, device_id, interface_id, remove=False): @@ -3257,8 +3211,9 @@ class CvpApi(object): }, "remove": remove } - self.log.debug('v6 {} '.format(tag_url) + str(payload)) + self.log.debug(f"v6 {tag_url} " + str(payload)) return self.clnt.post(tag_url, data=payload) + return None def get_all_workspaces(self): ''' Get state information for all workspaces @@ -3271,8 +3226,9 @@ class CvpApi(object): if self.cvp_version_compare('>=', 6.0, msg): workspace_url = '/api/resources/workspace/v1/Workspace/all' payload = {} - self.log.debug('v6 {}'.format(workspace_url)) + self.log.debug(f"v6 {workspace_url}") return self.clnt.post(workspace_url, data=payload) + return None def get_workspace(self, workspace_id): ''' Get state information for all workspaces @@ -3283,10 +3239,10 @@ class CvpApi(object): msg = 'Workspace Resource APIs are supported from 2021.2.0 or newer.' # For on-prem check the version as it is only supported from 2021.2.0+ if self.cvp_version_compare('>=', 6.0, msg): - workspace_url = '/api/resources/workspace/v1/Workspace?key.workspaceId={}'.format( - workspace_id) - self.log.debug('v6 {}'.format(workspace_url)) + workspace_url = f"/api/resources/workspace/v1/Workspace?key.workspaceId={workspace_id}" + self.log.debug(f"v6 {workspace_url}") return self.clnt.get(workspace_url) + return None def workspace_config(self, workspace_id, display_name, description='', request='REQUEST_UNSPECIFIED', @@ -3333,6 +3289,7 @@ class CvpApi(object): } self.log.debug('v6 ' + str(workspace_url) + ' ' + str(payload)) return self.clnt.post(workspace_url, data=payload) + return None def workspace_build_status(self, workspace_id, build_id): ''' Verify the state of the workspace build process. @@ -3349,10 +3306,11 @@ class CvpApi(object): msg = 'Workspace Resource APIs are supported from 2021.2.0 or newer.' # For on-prem check the version as it is only supported from 2021.2.0+ if self.cvp_version_compare('>=', 6.0, msg): - params = 'key.workspaceId={}&key.buildId={}'.format(workspace_id, build_id) + params = f"key.workspaceId={workspace_id}&key.buildId={build_id}" workspace_url = '/api/resources/workspace/v1/WorkspaceBuild?' + params - self.log.debug('v6 {}'.format(workspace_url + params)) + self.log.debug(f"v6 {workspace_url + params}") return self.clnt.get(workspace_url, timeout=self.request_timeout) + return None def change_control_get_one(self, cc_id, cc_time=None): ''' Get the configuration and status of a change control using Resource APIs. @@ -3380,11 +3338,11 @@ class CvpApi(object): # For on-prem check the version as it is only supported from 2021.2.0+ if self.cvp_version_compare('>=', 6.0, msg): if cc_time is None: - params = 'key.id={}'.format(cc_id) + params = f"key.id={cc_id}" else: - params = 'key.id={}&time={}'.format(cc_id, cc_time) + params = f"key.id={cc_id}&time={cc_time}" cc_url = '/api/resources/changecontrol/v1/ChangeControl?' + params - self.log.debug('v6 {}'.format(cc_url)) + self.log.debug(f"v6 {cc_url}") try: response = self.clnt.get(cc_url, timeout=self.request_timeout) except Exception as error: @@ -3392,6 +3350,7 @@ class CvpApi(object): return None raise error return response + return None def change_control_get_all(self): ''' Get the configuration and status of all Change Controls using Resource APIs. @@ -3404,8 +3363,9 @@ class CvpApi(object): # For on-prem check the version as it is only supported from 2021.2.0+ if self.cvp_version_compare('>=', 6.0, msg): cc_url = '/api/resources/changecontrol/v1/ChangeControl/all' - self.log.debug('v6 {}'.format(cc_url)) + self.log.debug(f"v6 {cc_url}") return self.clnt.get(cc_url, timeout=self.request_timeout) + return None def change_control_approval_get_one(self, cc_id, cc_time=None): ''' Get the state of a specific Change Control's approve config using Resource APIs. @@ -3426,9 +3386,9 @@ class CvpApi(object): # For on-prem check the version as it is only supported from 2021.2.0+ if self.cvp_version_compare('>=', 6.0, msg): if cc_time is None: - params = 'key.id={}'.format(cc_id) + params = f"key.id={cc_id}" else: - params = 'key.id={}&time={}'.format(cc_id, cc_time) + params = f"key.id={cc_id}&time={cc_time}" cc_url = '/api/resources/changecontrol/v1/ApproveConfig?' + params cc_status = self.change_control_get_one(cc_id) if cc_status is None: @@ -3439,6 +3399,7 @@ class CvpApi(object): " state to be populated.") return None return self.clnt.get(cc_url, timeout=self.request_timeout) + return None def change_control_approval_get_all(self): ''' Get state information for all Change Control Approvals using Resource APIs. @@ -3451,8 +3412,9 @@ class CvpApi(object): # For on-prem check the version as it is only supported from 2021.2.0+ if self.cvp_version_compare('>=', 6.0, msg): cc_url = '/api/resources/changecontrol/v1/ApproveConfig/all' - self.log.debug('v6 {}'.format(cc_url)) + self.log.debug(f"v6 {cc_url}") return self.clnt.get(cc_url, timeout=self.request_timeout) + return None def change_control_approve(self, cc_id, notes="", approve=True): ''' Approve/Unapprove a change control using Resource APIs. @@ -3498,10 +3460,11 @@ class CvpApi(object): msg = 'Change Control Resource APIs are supported from 2021.2.0 or newer.' # For on-prem check the version as it is only supported from 2021.2.0+ if self.cvp_version_compare('>=', 6.0, msg): - params = 'key.id={}'.format(cc_id) + params = f"key.id={cc_id}" cc_url = '/api/resources/changecontrol/v1/ChangeControlConfig?' + params - self.log.debug('v6 {}'.format(cc_url)) + self.log.debug(f"v6 {cc_url}") return self.clnt.delete(cc_url, timeout=self.request_timeout) + return None def change_control_create_with_custom_stages(self, custom_cc=None): ''' Create a Change Control with custom stage hierarchy using Resource APIs. @@ -3636,6 +3599,7 @@ class CvpApi(object): cc_url = '/api/resources/changecontrol/v1/ChangeControlConfig' self.log.debug('v6 ' + str(cc_url) + ' ' + str(payload)) return self.clnt.post(cc_url, data=payload) + return None def change_control_create_for_tasks(self, cc_id, name, tasks, series=True): ''' Create a simple Change Control for tasks using Resource APIs. @@ -3659,7 +3623,7 @@ class CvpApi(object): stages = {'values': {'root': {'name': 'root', 'rows': {'values': []}}}} if series: for index, task in enumerate(tasks): - stage_id = 'stage%d' % index + stage_id = f"stage{index}" stages['values']['root']['rows']['values'].append({'values': [stage_id]}) stages['values'][stage_id] = { 'action': { @@ -3676,7 +3640,7 @@ class CvpApi(object): else: stages['values']['root']['rows']['values'].append({'values': []}) for index, task in enumerate(tasks): - stage_id = 'stage%d' % index + stage_id = f"stage{index}" stages['values']['root']['rows']['values'][0]['values'].append(stage_id) stages['values'][stage_id] = { 'action': { @@ -3707,6 +3671,7 @@ class CvpApi(object): cc_url = '/api/resources/changecontrol/v1/ChangeControlConfig' self.log.debug('v6 ' + str(cc_url) + ' ' + str(payload)) return self.clnt.post(cc_url, data=payload, timeout=self.request_timeout) + return None def change_control_start(self, cc_id, notes=""): ''' Start a Change Control using Resource APIs. @@ -3734,6 +3699,7 @@ class CvpApi(object): cc_url = '/api/resources/changecontrol/v1/ChangeControlConfig' self.log.debug('v6 ' + str(cc_url) + ' ' + str(payload)) return self.clnt.post(cc_url, data=payload, timeout=self.request_timeout) + return None def change_control_stop(self, cc_id, notes=""): ''' Stop a Change Control using Resource APIs. @@ -3762,6 +3728,7 @@ class CvpApi(object): cc_url = '/api/resources/changecontrol/v1/ChangeControlConfig' self.log.debug('v6 ' + str(cc_url) + ' ' + str(payload)) return self.clnt.post(cc_url, data=payload, timeout=self.request_timeout) + return None def change_control_schedule(self, cc_id, schedule_time, notes=""): ''' Schedule a Change Control using Resource APIs. @@ -3793,6 +3760,7 @@ class CvpApi(object): cc_url = '/api/resources/changecontrol/v1/ChangeControlConfig' self.log.debug('v8 ' + str(cc_url) + ' ' + str(payload)) return self.clnt.post(cc_url, data=payload, timeout=self.request_timeout) + return None def device_decommissioning(self, device_id, request_id): ''' Decommission a device using Resource APIs. @@ -3802,7 +3770,7 @@ class CvpApi(object): request_id (string): Key identifies the request to decommission the device. Recommended to generate uuid with str(uuid.uuid4()). Returns: - response (dict): Returns None if the device is not found else returns A dict that contains... + response (dict): Returns None if device is not found else return dict that contains Ex: {'value': {'key': {'requestId': '4a4ba5a2-9886-4cd5-84d6-bdaf85a9f091'}, 'deviceId': 'BAD032986065E8DC14CBB6472EC314A6'}, 'time': '2022-02-12T02:58:30.765459650Z'} @@ -3827,10 +3795,9 @@ class CvpApi(object): self.log.debug('v7 ' + str(url) + ' ' + str(payload)) return self.clnt.post(url, data=payload, timeout=self.request_timeout) else: - self.log.warning( - 'Device with %s serial number does not exist (or is not registered) to decommission' - % device_id) - return None + self.log.warning("Device with %s serial number does not exist (or is not registered)" + " to decommission", device_id) + return None def device_decommissioning_status_get_one(self, request_id): ''' Get the decommission status of a device using Resource APIs. @@ -3847,10 +3814,11 @@ class CvpApi(object): msg = 'Decommissioning via Resource APIs are supported from 2021.3.0 or newer.' # For on-prem check the version as it is only supported from 2021.3.0+ if self.cvp_version_compare('>=', 7.0, msg): - params = 'key.requestId={}'.format(request_id) + params = f"key.requestId={request_id}" url = '/api/resources/inventory/v1/DeviceDecommissioning?' + params self.log.debug('v7 ' + str(url)) return self.clnt.get(url, timeout=self.request_timeout) + return None def device_decommissioning_status_get_all(self, status="DECOMMISSIONING_STATUS_UNSPECIFIED"): ''' Get the decommissioning status of all devices using Resource APIs. @@ -3882,6 +3850,7 @@ class CvpApi(object): url = '/api/resources/inventory/v1/DeviceDecommissioning/all' self.log.debug('v7 ' + str(url)) return self.clnt.post(url, data=payload, timeout=self.request_timeout) + return None def add_role(self, name, description, moduleList): ''' Add new local role to the CVP UI. @@ -3918,9 +3887,10 @@ class CvpApi(object): rolekey (str): role key on CVP Returns: response (dict): Returns a dict that contains the role. - Ex: {'name': 'Test Role', 'key': 'role_1599019487020581247', 'description': 'Test'...} + Ex: {'name': 'Test Role', 'key': 'role_1599019487020581247', + 'description': 'Test'...} ''' - return self.clnt.get('/role/getRole.do?roleId={}'.format(rolekey), + return self.clnt.get(f"/role/getRole.do?roleId={rolekey}", timeout=self.request_timeout) def get_roles(self): @@ -3962,8 +3932,9 @@ class CvpApi(object): msg = 'Service Account Resource APIs are supported from 2021.3.0+.' if self.cvp_version_compare('>=', 7.0, msg): url = '/api/v3/services/arista.serviceaccount.v1.TokenService/GetAll' - self.log.debug('v7 {}'.format(url)) + self.log.debug(f"v7 {url}") return self.clnt.post(url) + return None def svc_account_token_get_one(self, token_id): ''' Get a service account token's state using Resource APIs @@ -3979,8 +3950,9 @@ class CvpApi(object): if self.cvp_version_compare('>=', 7.0, msg): payload = {"key": {"id": token_id}} url = '/api/v3/services/arista.serviceaccount.v1.TokenService/GetOne' - self.log.debug('v7 {} {}'.format(url, payload)) + self.log.debug(f"v7 {url} {payload}") return self.clnt.post(url, data=payload) + return None def svc_account_token_delete(self, token_id): ''' Delete a service account token using Resource APIs. @@ -3996,8 +3968,9 @@ class CvpApi(object): if self.cvp_version_compare('>=', 7.0, msg): payload = {"key": {"id": token_id}} url = '/api/v3/services/arista.serviceaccount.v1.TokenConfigService/Delete' - self.log.debug('v7 {} {}'.format(url, payload)) + self.log.debug(f"v7 {url} {payload}") return self.clnt.post(url, data=payload) + return None def svc_account_token_set(self, username, duration, description): ''' Create a service account token using Resource APIs. @@ -4020,8 +3993,9 @@ class CvpApi(object): msg = 'Service Account Resource APIs are supported from 2021.3.0+.' if self.cvp_version_compare('>=', 7.0, msg): url = '/api/v3/services/arista.serviceaccount.v1.TokenConfigService/Set' - self.log.debug('v7 {} {}'.format(url, payload)) + self.log.debug(f"v7 {url} {payload}") return self.clnt.post(url, data=payload) + return None def svc_account_get_all(self): ''' Get all service account states using Resource APIs. @@ -4036,8 +4010,9 @@ class CvpApi(object): msg = 'Service Account Resource APIs are supported from 2021.3.0+.' if self.cvp_version_compare('>=', 7.0, msg): url = '/api/v3/services/arista.serviceaccount.v1.AccountService/GetAll' - self.log.debug('v7 {} '.format(url)) + self.log.debug(f"v7 {url}") return self.clnt.post(url) + return None def svc_account_get_one(self, username): ''' Get a service account's state using Resource APIs @@ -4054,8 +4029,9 @@ class CvpApi(object): if self.cvp_version_compare('>=', 7.0, msg): payload = {"key": {"name": username}} url = '/api/v3/services/arista.serviceaccount.v1.AccountService/GetOne' - self.log.debug('v7 {} {}'.format(url, payload)) + self.log.debug(f"v7 {url} {payload}") return self.clnt.post(url, data=payload) + return None def svc_account_set(self, username, description, roles, status): ''' Create a service account using Resource APIs. @@ -4087,16 +4063,17 @@ class CvpApi(object): if role['key'] in roles or role['name'] in roles: role_ids.append(role['key']) if len(roles) != len(role_ids): - self.log.warning('Not all provided roles {} are valid. ' - 'Only using the found valid roles {}'.format(roles, role_ids)) + self.log.warning(f"Not all provided roles {roles} are valid. " + f"Only using the found valid roles {role_ids}") payload = {'value': {'description': description, 'groups': {'values': role_ids}, 'key': {'name': username}, 'status': status}} url = '/api/v3/services/arista.serviceaccount.v1.AccountConfigService/Set' - self.log.debug('v7 {} {}'.format(url, payload)) + self.log.debug(f"v7 {url} {payload}") return self.clnt.post(url, data=payload) + return None def svc_account_delete(self, username): ''' Delete a service account using Resource APIs. @@ -4112,8 +4089,9 @@ class CvpApi(object): if self.cvp_version_compare('>=', 7.0, msg): payload = {"key": {"name": username}} url = '/api/v3/services/arista.serviceaccount.v1.AccountConfigService/Delete' - self.log.debug('v7 {} {}'.format(url, payload)) + self.log.debug(f"v7 {url} {payload}") return self.clnt.post(url, data=payload) + return None def svc_account_delete_expired_tokens(self): ''' Delete all service account tokens using Resource APIs. diff --git a/cvprac/cvp_client.py b/cvprac/cvp_client.py index 602f21a..a02676f 100644 --- a/cvprac/cvp_client.py +++ b/cvprac/cvp_client.py @@ -29,6 +29,9 @@ # OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN # IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # + +# pylint: disable=too-many-branches,too-many-statements,too-many-locals,too-many-lines + ''' RESTful API Client class for Cloudvision(R) Portal This module provides a RESTful API client for Cloudvision(R) Portal (CVP) @@ -96,18 +99,24 @@ import json import logging from logging.handlers import SysLogHandler from itertools import cycle -from pkg_resources import parse_version +from packaging.version import parse import requests -from requests.exceptions import ConnectionError, HTTPError, Timeout, \ - ReadTimeout, TooManyRedirects, JSONDecodeError +from requests.exceptions import ( # pylint: disable=redefined-builtin + ConnectionError, + HTTPError, + Timeout, + ReadTimeout, + TooManyRedirects, + JSONDecodeError +) from cvprac.cvp_api import CvpApi from cvprac.cvp_client_errors import CvpApiError, CvpLoginError, \ CvpRequestError, CvpSessionLogOutError -class CvpClient(object): +class CvpClient(): ''' Use this class to create a persistent connection to CVP. ''' # pylint: disable=too-many-instance-attributes @@ -233,28 +242,37 @@ class CvpClient(object): ' Appending 0. Updated Version String - %s', ".".join(version_components)) full_version = ".".join(version_components) - if parse_version(full_version) >= parse_version('2023.1.0'): + if parse(full_version) >= parse('2024.1.0'): + self.log.info('Setting API version to v12') + self.apiversion = 12.0 + elif parse(full_version) >= parse('2023.3.0'): + self.log.info('Setting API version to v11') + self.apiversion = 11.0 + elif parse(full_version) >= parse('2023.2.0'): + self.log.info('Setting API version to v10') + self.apiversion = 10.0 + elif parse(full_version) >= parse('2023.1.0'): self.log.info('Setting API version to v9') self.apiversion = 9.0 - elif parse_version(full_version) >= parse_version('2022.1.0'): + elif parse(full_version) >= parse('2022.1.0'): self.log.info('Setting API version to v8') self.apiversion = 8.0 - elif parse_version(full_version) >= parse_version('2021.3.0'): + elif parse(full_version) >= parse('2021.3.0'): self.log.info('Setting API version to v7') self.apiversion = 7.0 - elif parse_version(full_version) >= parse_version('2021.2.0'): + elif parse(full_version) >= parse('2021.2.0'): self.log.info('Setting API version to v6') self.apiversion = 6.0 - elif parse_version(full_version) >= parse_version('2020.2.4'): + elif parse(full_version) >= parse('2020.2.4'): self.log.info('Setting API version to v5') self.apiversion = 5.0 - elif parse_version(full_version) >= parse_version('2020.1.1'): + elif parse(full_version) >= parse('2020.1.1'): self.log.info('Setting API version to v4') self.apiversion = 4.0 - elif parse_version(full_version) >= parse_version('2019.0.0'): + elif parse(full_version) >= parse('2019.0.0'): self.log.info('Setting API version to v3') self.apiversion = 3.0 - elif parse_version(full_version) >= parse_version('2018.2.0'): + elif parse(full_version) >= parse('2018.2.0'): self.log.info('Setting API version to v2') self.apiversion = 2.0 else: @@ -378,13 +396,12 @@ class CvpClient(object): self.error_msg = '\n' for _ in range(0, num_nodes): host = next(self.node_pool) - self.url_prefix = ('https://%s:%d/web' % (host, self.port or 443)) - self.url_prefix_short = ('https://%s:%d' - % (host, self.port or 443)) + self.url_prefix = f"https://{host}:{self.port or 443}/web" + self.url_prefix_short = f"https://{host}:{self.port or 443}" error = self._reset_session() if error is None: break - self.error_msg += '%s: %s\n' % (host, error) + self.error_msg += f"{host}: {error}\n" def _reset_session(self): ''' Get a new request session and try logging into the current @@ -428,23 +445,20 @@ class CvpClient(object): if 'Unauthorized' in response.reason: # Check for 'Unauthorized' User error because this is how # CVP responds to a logged out users requests in 2018.x. - msg = '%s: Request Error: %s' % (prefix, response.reason) + msg = f"{prefix}: Request Error: {response.reason}" self.log.error(msg) raise CvpApiError(msg) if 'User is unauthorized' in response.text: # Check for 'User is unauthorized' response text because this # is how CVP responds to a logged out users requests in 2019.x. - msg = '%s: Request Error: User is unauthorized' % prefix + msg = f"{prefix}: Request Error: User is unauthorized" self.log.error(msg) raise CvpApiError(msg) - else: - msg = '%s: Request Error: %s - %s' % (prefix, response.reason, - response.text) - self.log.error(msg) - raise CvpRequestError(msg) + msg = f"{prefix}: Request Error: {response.reason} - {response.text}" + raise CvpRequestError(msg) if 'LOG OUT MESSAGE' in response.text: - msg = ('%s: Request Error: session logged out' % prefix) + msg = f"{prefix}: Request Error: session logged out" raise CvpSessionLogOutError(msg) joutput = json_decoder(response.text) @@ -460,9 +474,9 @@ class CvpClient(object): # Build the error message from all the errors. err_msg = error_list[0] for idx in range(1, len(error_list)): - err_msg = '%s\n%s' % (err_msg, error_list[idx]) + err_msg = f"{err_msg}\n{error_list[idx]}" - msg = ('%s: Request Error: %s' % (prefix, err_msg)) + msg = f"{prefix}: Request Error: {err_msg}" self.log.error(msg) raise CvpApiError(msg) @@ -477,8 +491,7 @@ class CvpClient(object): response status is not OK. ''' if not response.ok: - msg = '%s: Request Error: %s - %s' % (prefix, response.reason, - response.text) + msg = f"{prefix}: Request Error: {response.reason} - {response.text}" self.log.error(msg) raise CvpRequestError(msg) @@ -512,7 +525,7 @@ class CvpClient(object): self.headers.pop('APP_SESSION_ID', None) if self.api_token is not None: return self._set_headers_api_token() - elif self.is_cvaas: + if self.is_cvaas: raise CvpLoginError('CVaaS only supports API token authentication.' ' Please create an API token and provide it' ' via the api_token parameter in combination' @@ -551,7 +564,7 @@ class CvpClient(object): headers=self.headers, timeout=self.connect_timeout, verify=self.cert) - self._is_good_response(response, 'Authenticate: %s' % url) + self._is_good_response(response, f"Authenticate: {url}") self.cookies = response.cookies self.headers['APP_SESSION_ID'] = response.json()['sessionId'] @@ -561,18 +574,20 @@ class CvpClient(object): ''' # If using an API token there is no need to run a Login API. # Simply add the token into the headers or cookies - self.headers['Authorization'] = 'Bearer %s' % self.api_token + self.headers['Authorization'] = f"Bearer {self.api_token}" # Alternative to adding token to headers it can be added to # cookies as shown below. # self.cookies = {'access_token': self.api_token} url = self.url_prefix_short + '/api/v1/rest/' - response = self.session.get(url, - cookies=self.cookies, - headers=self.headers, - timeout=self.connect_timeout, - verify=self.cert) + response = self.session.get( + url, + cookies=self.cookies, + headers=self.headers, + timeout=self.connect_timeout, + verify=self.cert + ) # Verify that the generic request was successful - self._is_good_response(response, 'Authenticate: %s' % url) + self._is_good_response(response, f"Authenticate: {url}") def logout(self): ''' @@ -584,7 +599,7 @@ class CvpClient(object): self.log.info('User logged out.') self.session = None else: - err = 'Error trying to logout %s' % response + err = f"Error trying to logout {response}" self.log.error(err) def _make_request(self, req_type, url, timeout, data=None, @@ -700,8 +715,8 @@ class CvpClient(object): try: resp_data = response.json() - if (resp_data is not None and 'result' in resp_data - and '/resources/' in full_url): + if (resp_data is not None and 'result' in resp_data and + '/resources/' in full_url): # Resource APIs use JSON streaming and will return # multiple JSON objects during GetAll type API # calls. We are wrapping the multiple objects into @@ -725,10 +740,8 @@ class CvpClient(object): ' response data. Attempt to decode') decoded_data = json_decoder(response.text) return {'data': decoded_data} - else: - self.log.error('Unknown format for JSONDecodeError - %s', - err_str) - raise error + self.log.error("Unknown format for JSONDecodeError - %s", err_str) + raise error def _send_request(self, req_type, full_url, timeout, data=None, files=None): @@ -795,7 +808,7 @@ class CvpClient(object): timeout=timeout, verify=self.cert) else: - fhs = dict() + fhs = {} fhs['Accept'] = self.headers['Accept'] if 'APP_SESSION_ID' in self.headers: fhs['APP_SESSION_ID'] = self.headers[ @@ -830,8 +843,7 @@ class CvpClient(object): continue try: - self._is_good_response(response, '%s: %s ' % - (req_type, full_url)) + self._is_good_response(response, f"{req_type}: {full_url} ") except CvpSessionLogOutError as error: self.log.debug(error) # Retry the request to the same node if there was a CVP session @@ -840,11 +852,10 @@ class CvpClient(object): # be retried on the same node. if req_try + 1 == self.NUM_RETRY_REQUESTS: raise error - else: - self._reset_session() - if not self.session: - raise error - continue + self._reset_session() + if not self.session: + raise error + continue except CvpApiError as error: self.log.debug(error) if ('Unauthorized' in error.msg or @@ -859,14 +870,12 @@ class CvpClient(object): # will be retried on the same node. if req_try + 1 == self.NUM_RETRY_REQUESTS: raise error - else: - self._reset_session() - if not self.session: - raise error - continue - else: - # pylint: disable=raising-bad-type - raise error + self._reset_session() + if not self.session: + raise error + continue + # pylint: disable=raising-bad-type + raise error return response def get(self, url, timeout=30): diff --git a/dev-requirements.txt b/dev-requirements.txt index 2395cb7..a0adc90 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,6 +1,5 @@ check-manifest coverage -mock pdoc pep8 pyflakes diff --git a/docs/labs/lab06-provisioning/vc_task_retrigger.py b/docs/labs/lab06-provisioning/vc_task_retrigger.py index b5586ee..cfee3a1 100644 --- a/docs/labs/lab06-provisioning/vc_task_retrigger.py +++ b/docs/labs/lab06-provisioning/vc_task_retrigger.py @@ -7,7 +7,7 @@ import argparse import ssl import sys -from pkg_resources import parse_version +from packaging.version import parse from getpass import getpass from cvprac.cvp_client import CvpClient import requests.packages.urllib3 @@ -56,7 +56,7 @@ def main(): # Get the current CVP version cvp_release = clnt.api.get_cvp_info()['version'] - if parse_version(cvp_release) < parse_version('2020.3.0'): + if parse(cvp_release) < parse('2020.3.0'): # For older CVP, we manually trigger a compliance check try: clnt.api.check_compliance('root', 'container') diff --git a/docs/release-notes-1.4.0.rst b/docs/release-notes-1.4.0.rst new file mode 100644 index 0000000..b6c81fb --- /dev/null +++ b/docs/release-notes-1.4.0.rst @@ -0,0 +1,20 @@ +###### +v1.4.0 +###### + +2024-5-6 + +Enhancements +^^^^^^^^^^^^ + +* Move from pkg_resources to packaging for Python 3.12 support. (`271 <https://github.com/aristanetworks/cvprac/pull/271>`_) [`mharista <https://github.com/mharista>`_] +* Add support for searchTopology V3 endpoint. (`275 <https://github.com/aristanetworks/cvprac/pull/275>`_) [`mharista <https://github.com/mharista>`_] + +Fixed +^^^^^ + +* Add missing url encoding for get_user. (`264 <https://github.com/aristanetworks/cvprac/pull/264>`_) [`noredistribution <https://github.com/noredistribution>`_] +* Updated the enrollment endpoint for CVaaS. (`269 <https://github.com/aristanetworks/cvprac/pull/269>`_) [`noredistribution <https://github.com/noredistribution>`_] +* Add timeout to get_configlets_and_mappers(). (`270 <https://github.com/aristanetworks/cvprac/pull/270>`_) [`noredistribution <https://github.com/noredistribution>`_] +* Update setup.py to reference python3 only. (`272 <https://github.com/aristanetworks/cvprac/pull/272>`_) [`mharista <https://github.com/mharista>`_] +* Python3 lint/format fixes. (`273 <https://github.com/aristanetworks/cvprac/pull/273>`_) [`mharista <https://github.com/mharista>`_] diff --git a/requirements.txt b/requirements.txt index 9029d8d..108c9fb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ requests[socks]>=2.27.0 +packaging>=23.2 @@ -100,8 +100,13 @@ setup( # Specify the Python versions you support here. In particular, ensure # that you indicate whether you support Python 2, Python 3 or both. - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3 :: Only', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10', + 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', ], # What does your project relate to? @@ -111,7 +116,7 @@ setup( # your project is installed. For an analysis of "install_requires" vs pip's # requirements files see: # https://packaging.python.org/en/latest/requirements.html - install_requires=['requests[socks]>=2.27.0'], + install_requires=['requests[socks]>=2.27.0', 'packaging>=23.2'], # List additional groups of dependencies here (e.g. development # dependencies). You can install these using the following syntax, |