path: root/cvprac/
diff options
Diffstat (limited to '')
1 files changed, 4100 insertions, 0 deletions
diff --git a/cvprac/ b/cvprac/
new file mode 100644
index 0000000..dc3fda1
--- /dev/null
+++ b/cvprac/
@@ -0,0 +1,4100 @@
+# Copyright (c) 2017, Arista Networks, Inc.
+# All rights reserved.
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+# Redistributions of source code must retain the above copyright notice,
+# this list of conditions and the following disclaimer.
+# Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+# Neither the name of Arista Networks nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+''' Class containing calls to CVP RESTful API.
+import operator
+import os
+import time
+# This import is for proper file IO handling support for both Python 2 and 3
+# pylint: disable=redefined-builtin
+from io import open
+from datetime import datetime
+from re import split
+from cvprac.cvp_client_errors import CvpApiError
+ from urllib import quote_plus as qplus
+except (AttributeError, ImportError):
+ from urllib.parse import quote_plus as qplus
+ '>':,
+ '<':,
+ '>=':,
+ '<=': operator.le,
+ '==': operator.eq
+class CvpApi(object):
+ ''' CvpApi class contains calls to CVP RESTful API. The RESTful API
+ parameters are passed in as parameters to the method. The results of
+ the RESTful API call are converted from json to a dict and returned.
+ Where needed minimal processing is performed on the results.
+ Any method that does a cvprac get or post call could raise the
+ following errors:
+ ConnectionError: A ConnectionError is raised if there was a network
+ problem (e.g. DNS failure, refused connection, etc)
+ CvpApiError: A CvpApiError is raised if there was a JSON error.
+ CvpRequestError: A CvpRequestError is raised if the request is not
+ properly constructed.
+ CvpSessionLogOutError: A CvpSessionLogOutError is raised if
+ reponse from server indicates session was logged out.
+ HTTPError: A HTTPError is raised if there was an invalid HTTP response.
+ ReadTimeout: A ReadTimeout is raised if there was a request
+ timeout when reading from the connection.
+ Timeout: A Timeout is raised if there was a request timeout.
+ TooManyRedirects: A TooManyRedirects is raised if the request exceeds
+ the configured number of maximum redirections
+ ValueError: A ValueError is raised when there is no valid
+ CVP session. This occurs because the previous get or post request
+ failed and no session could be established to a CVP node. Destroy
+ the class and re-instantiate.
+ '''
+ # pylint: disable=too-many-public-methods
+ # pylint: disable=too-many-lines
+ def __init__(self, clnt, request_timeout=30):
+ ''' Initialize the class.
+ Args:
+ clnt (obj): A CvpClient object
+ '''
+ self.clnt = clnt
+ self.log = clnt.log
+ self.request_timeout = request_timeout
+ def cvp_version_compare(self, opr, version, msg):
+ ''' Check provided version with given operator against the current CVP
+ version
+ Args:
+ opr (string): The operator. Valid operators are:
+ > - Greater Than
+ < - Less Than
+ >= - Greater Than or Equal To
+ <= - Less Than or Equal To
+ == - Equal To
+ version (float): The float API Version number to compare the
+ running CVP version to.
+ '''
+ if opr not in OPERATOR_DICT:
+ self.log.error('%s is an invalid operation for version comparison'
+ % opr)
+ return False
+ # Since CVaaS is automatically the latest version of the API, if
+ # operators > or >= are provided we can quickly check if we are running
+ # on CVaaS and return True if found.
+ if opr in ['>', '>='] and self.clnt.is_cvaas:
+ return True
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ # Example: if a version of 6.0 is provided with greater than or equal
+ # operator (>=) we are validating that the running CVP version is
+ # greater than or equal to API Version 6.0.
+ # Hence -- self.clnt.apiversion >= 6.0
+ if not OPERATOR_DICT[opr](self.clnt.apiversion, version):
+ self.log.warning(msg)
+ return False
+ return True
+ def get_cvp_info(self):
+ ''' Returns information about CVP.
+ Returns:
+ cvp_info (dict): CVP Information
+ '''
+ if not self.clnt.is_cvaas:
+ data = self.clnt.get('/cvpInfo/',
+ timeout=self.request_timeout)
+ else:
+ # For CVaaS do not run the getCvpInfo REST API and assume the
+ # latest version of the API
+ data = {'version': 'cvaas'}
+ if 'version' in data and self.clnt.apiversion is None:
+ self.clnt.set_version(data['version'])
+ return data
+ # pylint: disable=too-many-arguments
+ def add_user(self, username, password, role, status, first_name,
+ last_name, email, user_type):
+ ''' Add new local user to the CVP UI.
+ Args:
+ username (str): local username on CVP
+ password (str): password of the user
+ role (str): role of the user
+ status (str): state of the user (Enabled/Disabled)
+ first_name (str): first name of the user
+ last_name (str): last name of the user
+ email (str): email address of the user
+ user_type (str): type of AAA (Local/TACACS/RADIUS)
+ '''
+ if status not in ['Enabled', 'Disabled']:
+ self.log.error('Invalid status %s.'
+ ' Status must be Enabled or Disabled.'
+ ' Defaulting to Disabled' % status)
+ status = 'Disabled'
+ data = {"roles": [role],
+ "user": {"contactNumber": "",
+ "email": email,
+ "firstName": first_name,
+ "lastName": last_name,
+ "password": password,
+ "userId": username,
+ "userStatus": status,
+ "userType": user_type}}
+ return'/user/', data=data,
+ timeout=self.request_timeout)
+ def update_user(self, username, password, role, status, first_name,
+ last_name, email, user_type):
+ ''' Updates username information, like
+ changing password, user role, email address, names,
+ disable/enable the username.
+ Args:
+ username (str): local username on CVP
+ password (str): password of the user
+ role (str): role of the user
+ status (str): state of the user (Enabled/Disabled)
+ first_name (str): first name of the user
+ last_name (str): last name of the user
+ email (str): email address of the user
+ user_type (str): type of AAA (Local/TACACS/RADIUS)
+ '''
+ if status not in ['Enabled', 'Disabled']:
+ self.log.error('Invalid status %s.'
+ ' Status must be Enabled or Disabled.'
+ ' Defaulting to Disabled' % status)
+ status = 'Disabled'
+ data = {"roles": [role],
+ "user": {"contactNumber": "",
+ "email": email,
+ "firstName": first_name,
+ "lastName": last_name,
+ "password": password,
+ "userId": username,
+ "userStatus": status,
+ "userType": user_type}}
+ return'/user/{}'.format(username),
+ data=data, timeout=self.request_timeout)
+ def get_user(self, username):
+ ''' Returns specified user information
+ Args:
+ username (str): username on CVP
+ '''
+ return self.clnt.get('/user/{}'.format(username),
+ timeout=self.request_timeout)
+ def get_users(self, query='', start=0, end=0):
+ ''' Returns all users in CVP filtered by an optional query parameter
+ Args:
+ query (str): Query parameter to filter users by.
+ start (int): Start index for the pagination. Default is 0.
+ end (int): End index for the pagination. If end index is 0
+ then all the records will be returned. Default is 0.
+ Returns:
+ response (dict): A dict that contains the users.
+ {'total': 1,
+ 'roles': {'cvpadmin': ['network-admin']},
+ 'users': [{'userId': 'cvpadmin',
+ 'firstName': '',
+ 'lastName': '',
+ 'description': '',
+ 'email': '',
+ 'lastAccessed': 1654555139700,
+ 'contactNumber': '',
+ 'userType': 'Local',
+ 'userStatus': 'Enabled',
+ 'currentStatus': 'Online',
+ 'addedByUser': 'cvp system'}]}
+ '''
+ self.log.debug('get_users: query: %s' % query)
+ return self.clnt.get('/user/'
+ 'queryparam=%s&startIndex=%d&endIndex=%d' %
+ (qplus(query), start, end),
+ timeout=self.request_timeout)
+ def delete_user(self, username):
+ ''' Remove specified user from CVP
+ Args:
+ username (str): username on CVP
+ '''
+ data = [username]
+ return'/user/', data=data,
+ timeout=self.request_timeout)
+ def get_task_by_id(self, task_id):
+ ''' Returns the current CVP Task status for the task with the specified
+ TaskId.
+ Args:
+ task_id (int): CVP task identifier
+ Returns:
+ task (dict): The CVP task for the associated Id. Returns None
+ if the task_id was invalid.
+ '''
+ self.log.debug('get_task_by_id: task_id: %s' % task_id)
+ try:
+ task = self.clnt.get('/task/' % task_id,
+ timeout=self.request_timeout)
+ except CvpApiError as error:
+ self.log.debug('Caught error: %s attempting to get task.' % error)
+ # Catch an invalid task_id error and return None
+ return None
+ return task
+ def get_tasks_by_status(self, status, start=0, end=0):
+ ''' Returns a list of tasks with the given status.
+ Args:
+ status (str): Task status
+ start (int): Start index for the pagination. Default is 0.
+ end (int): End index for the pagination. If end index is 0
+ then all the records will be returned. Default is 0.
+ Returns:
+ tasks (list): The list of tasks
+ '''
+ self.log.debug('get_tasks_by_status: status: %s' % status)
+ data = self.clnt.get(
+ '/task/' %
+ (status, start, end), timeout=self.request_timeout)
+ return data['data']
+ def get_tasks(self, start=0, end=0):
+ ''' Returns a list of all the tasks.
+ Args:
+ start (int): Start index for the pagination. Default is 0.
+ end (int): End index for the pagination. If end index is 0
+ then all the records will be returned. Default is 0.
+ Returns:
+ tasks (dict): The 'total' key contains the number of tasks,
+ the 'data' key contains a list of the tasks.
+ '''
+ self.log.debug('get_tasks:')
+ return self.clnt.get('/task/'
+ 'endIndex=%d' % (start, end),
+ timeout=self.request_timeout)
+ def get_logs_by_id(self, task_id, start=0, end=0):
+ ''' Returns the log entries for the task with the specified TaskId.
+ Args:
+ task_id (int): CVP task identifier
+ start (int): The first log entry to return. Default is 0.
+ end (int): The last log entry to return. Default is 0 which
+ means to return all log entries. Can be a large number to
+ indicate the last log entry.
+ Returns:
+ task (dict): The CVP log for the associated Id. Returns None
+ if the task_id was invalid.
+ '''
+ self.log.debug('get_logs_by_id: task_id: %s' % task_id)
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion < 5.0:
+ self.log.debug('v1 - v4 /task/')
+ resp = self.clnt.get('/task/'
+ '&startIndex=%d&endIndex=%d' %
+ (task_id, start, end),
+ timeout=self.request_timeout)
+ else:
+ self.log.debug('v5 /audit/')
+ task_info = self.get_task_by_id(task_id)
+ stage_id = None
+ if 'stageId' in task_info:
+ stage_id = task_info['stageId']
+ else:
+ self.log.debug('No stage ID found for task %s' % task_id)
+ if 'ccIdV2' in task_info:
+ cc_id = task_info['ccIdV2']
+ if cc_id == '':
+ self.log.debug('No ccIdV2 for task %s.'
+ ' It was likely cancelled.'
+ ' Using old /task/'
+ % task_id)
+ resp = self.clnt.get(
+ '/task/'
+ '&startIndex=%d&endIndex=%d' % (task_id, start, end),
+ timeout=self.request_timeout)
+ else:
+ resp = self.get_audit_logs_by_id(cc_id, stage_id)
+ else:
+ self.log.debug('No change ID found for task %s' % task_id)
+ resp = None
+ return resp
+ def get_audit_logs_by_id(self, cc_id, stage_id=None, data_size=75):
+ ''' Returns the audit logs of a particular ChangeControl.
+ Args:
+ cc_id (string): change control ID from ccIdV2 field
+ stage_id (string): stage ID from stageId field
+ data_size (int): data size
+ Returns:
+ task (dict): The CVP log for the associated ccIdV2
+ '''
+ data = {"category": "ChangeControl",
+ "startTime": 0,
+ "endTime": 0,
+ "dataSize": data_size,
+ "objectKey": cc_id,
+ "lastRetrievedAudit": {}}
+ if stage_id:
+ data["tags"] = {"stageId": stage_id}
+ return'/cvpservice/audit/', data=data,
+ timeout=self.request_timeout)
+ def add_note_to_task(self, task_id, note):
+ ''' Add notes to the task.
+ Args:
+ task_id (str): Task ID
+ note (str): Note to add to the task
+ '''
+ self.log.debug('add_note_to_task: task_id: %s note: %s' %
+ (task_id, note))
+ data = {'workOrderId': task_id, 'note': note}
+'/task/', data=data,
+ timeout=self.request_timeout)
+ def execute_task(self, task_id):
+ ''' Execute the task. Note that if the task has failed then inspect
+ the task logs to determine why the task failed. If you see:
+ Failure response received from the netElement: Unauthorized User
+ then it means that the netelement does not have the same user ID
+ and/or password as the CVP user executing the task.
+ Args:
+ task_id (str): Task ID
+ '''
+ self.log.debug('execute_task: task_id: %s' % task_id)
+ data = {'data': [task_id]}
+'/task/', data=data,
+ timeout=self.request_timeout)
+ def cancel_task(self, task_id):
+ ''' Cancel the task
+ Args:
+ task_id (str): Task ID
+ '''
+ self.log.debug('cancel_task: task_id: %s' % task_id)
+ data = {'data': [task_id]}
+ return'/task/', data=data,
+ timeout=self.request_timeout)
+ def get_configlets(self, start=0, end=0):
+ ''' Returns a list of all defined configlets.
+ Args:
+ start (int): Start index for the pagination. Default is 0.
+ end (int): End index for the pagination. If end index is 0
+ then all the records will be returned. Default is 0.
+ '''
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ configlets = self.clnt.get('/configlet/'
+ 'startIndex=%d&endIndex=%d' % (start, end),
+ timeout=self.request_timeout)
+ if self.clnt.apiversion == 1.0 or self.clnt.apiversion >= 4.0:
+ self.log.debug('v1/v4+ Inventory API Call')
+ return configlets
+ else:
+ self.log.debug('v2 Inventory API Call')
+ # New API getConfiglets does not return the actual configlet config
+ # Get the actual configlet config using getConfigletByName
+ if 'data' in configlets:
+ for configlet in configlets['data']:
+ full_cfglt_data = self.get_configlet_by_name(
+ configlet['name'])
+ configlet['config'] = full_cfglt_data['config']
+ return configlets
+ def get_configlets_and_mappers(self):
+ ''' Returns a list of all defined configlets and associated mappers
+ '''
+ self.log.debug(
+ 'get_configlets_and_mappers: getConfigletsAndAssociatedMappers')
+ return self.clnt.get('/configlet/')
+ def get_configlet_builder(self, c_id):
+ ''' Returns the configlet builder data for the given configlet ID.
+ Args:
+ c_id (str): The ID (key) for the configlet to be queried.
+ '''
+ return self.clnt.get('/configlet/'
+ % c_id, timeout=self.request_timeout)
+ def search_configlets(self, query, start=0, end=0):
+ ''' Returns a list of configlets that match a search query.
+ Args:
+ query (str): A simple string of text to be matched against
+ the existing configlets. Not a regex.
+ start (int): Start index for the pagination. Default is 0.
+ end (int): End index for the pagination. If end index is 0
+ then all the records will be returned. Default is 0.
+ '''
+ self.log.debug('search_configlets: query: %s' % query)
+ return self.clnt.get('/configlet/'
+ 'queryparam=%s&startIndex=%d&endIndex=%d' %
+ (qplus(query), start, end),
+ timeout=self.request_timeout)
+ def get_configlet_by_name(self, name):
+ ''' Returns the configlet with the specified name
+ Args:
+ name (str): Name of the configlet. Can contain spaces.
+ Returns:
+ configlet (dict): The configlet dict.
+ '''
+ self.log.debug('get_configlets_by_name: name: %s' % name)
+ return self.clnt.get('/configlet/'
+ % qplus(name), timeout=self.request_timeout)
+ def get_configlets_by_container_id(self, c_id, start=0, end=0):
+ ''' Returns a list of configlets applied to the given container.
+ Args:
+ c_id (str): The container ID (key) to query.
+ start (int): Start index for the pagination. Default is 0.
+ end (int): End index for the pagination. If end index is 0
+ then all the records will be returned. Default is 0.
+ '''
+ return self.clnt.get('/provisioning/'
+ 'containerId=%s&startIndex=%d&endIndex=%d'
+ % (c_id, start, end),
+ timeout=self.request_timeout)
+ def get_configlets_by_netelement_id(self, d_id, start=0, end=0):
+ ''' Returns a list of configlets applied to the given device.
+ Args:
+ d_id (str): The device ID (key) to query.
+ start (int): Start index for the pagination. Default is 0.
+ end (int): End index for the pagination. If end index is 0
+ then all the records will be returned. Default is 0.
+ '''
+ return self.clnt.get('/provisioning/'
+ 'netElementId=%s&startIndex=%d&endIndex=%d'
+ % (d_id, start, end),
+ timeout=self.request_timeout)
+ def get_image_bundle_by_container_id(self, container_id, start=0, end=0,
+ scope='false'):
+ ''' Returns a list of ImageBundles applied to the given container.
+ Args:
+ container_id (str): The container ID (key) to query.
+ start (int): Start index for the pagination. Default is 0.
+ end (int): End index for the pagination. If end index is 0
+ then all the records will be returned. Default is 0.
+ scope (string) the session scope (true or false).
+ '''
+ if scope != 'true' and scope != 'false':
+ self.log.error('scope value must be true or false.'
+ ' %s is an invalid value.'
+ ' Defaulting back to false' % scope)
+ scope = 'false'
+ return self.clnt.get('/provisioning/'
+ 'containerId=%s&startIndex=%d&endIndex=%d'
+ '&sessionScope=%s'
+ % (container_id, start, end, scope),
+ timeout=self.request_timeout)
+ def get_configlet_history(self, key, start=0, end=0):
+ ''' Returns the configlet history.
+ Args:
+ key (str): Key for the configlet.
+ start (int): The first configlet entry to return. Default is 0
+ end (int): The last configlet entry to return. Default is 0
+ which means to return all configlet entries. Can be a
+ large number to indicate the last configlet entry.
+ Returns:
+ history (dict): The configlet dict with the changes from
+ most recent to oldest.
+ '''
+ self.log.debug('get_configlets_history: key: %s' % key)
+ return self.clnt.get('/configlet/'
+ '%s&queryparam=&startIndex=%d&endIndex=%d' %
+ (key, start, end), timeout=self.request_timeout)
+ def get_inventory(self, start=0, end=0, query=''):
+ ''' Returns the a dict of the net elements known to CVP.
+ Args:
+ start (int): The first inventory entry to return. Default is 0
+ end (int): The last inventory entry to return. Default is 0
+ which means to return all inventory entries. Can be a
+ large number to indicate the last inventory entry.
+ query (string): A value that can be used as a match to filter
+ returned inventory list. For example get all switches that
+ are running a specific version of EOS.
+ '''
+ self.log.debug('get_inventory: called')
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion == 1.0:
+ self.log.debug('v1 Inventory API Call')
+ data = self.clnt.get('/inventory/'
+ 'queryparam=%s&startIndex=%d&endIndex=%d' %
+ (qplus(query), start, end),
+ timeout=self.request_timeout)
+ return data['netElementList']
+ self.log.debug('v2 Inventory API Call')
+ data = self.clnt.get('/inventory/devices?provisioned=true',
+ timeout=self.request_timeout)
+ containers = self.get_containers()
+ for dev in data:
+ dev['key'] = dev['systemMacAddress']
+ dev['deviceInfo'] = dev['deviceStatus'] = dev['status']
+ dev['isMLAGEnabled'] = dev['mlagEnabled']
+ dev['isDANZEnabled'] = dev['danzEnabled']
+ dev['parentContainerId'] = dev['parentContainerKey']
+ dev['bootupTimeStamp'] = dev['bootupTimestamp']
+ dev['internalBuildId'] = dev['internalBuild']
+ if 'taskIdList' not in dev:
+ dev['taskIdList'] = []
+ if 'tempAction' not in dev:
+ dev['tempAction'] = None
+ dev['memTotal'] = 0
+ dev['memFree'] = 0
+ dev['sslConfigAvailable'] = False
+ dev['sslEnabledByCVP'] = False
+ dev['lastSyncUp'] = 0
+ dev['type'] = 'netelement'
+ dev['dcaKey'] = None
+ container_found = False
+ for container in containers['data']:
+ if dev['parentContainerKey'] == container['key']:
+ dev['containerName'] = container['name']
+ container_found = True
+ if not container_found:
+ dev['containerName'] = ''
+ return data
+ def add_devices_to_inventory(self, device_list, wait=False):
+ ''' Add a list of devices to the specified parent container.
+ Args:
+ device_list (list): A list of devices to be added in the
+ form of dictionaries. Each device dictionary should
+ contain the following information:
+ - device_ip (str): ip address of device we are adding
+ - parent_name (str): Parent container name
+ - parent_key (str): Parent container key
+ wait (boolean): Specifies whether to allow a wait time for
+ devices to appear in inventory before moving them to
+ the specified container. Applies to v2 API only.
+ Example device list:
+ device_list = [
+ {
+ device_ip: '',
+ parent_name: 'Tenant',
+ parent_key: 'root'
+ },
+ {
+ device_ip: '',
+ parent_name: 'MyContainer',
+ parent_key: 'container-id-1234'
+ }
+ ]
+ '''
+ self.log.debug('add_device_to_inventory: called')
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion == 1.0:
+ self.log.debug('v1 Inventory API Call')
+ data_list = []
+ for device in device_list:
+ dev_data = {
+ 'containerName': device['parent_name'],
+ 'containerId': device['parent_key'],
+ 'containerType': 'Existing',
+ 'ipAddress': device['device_ip'],
+ 'containerList': []
+ }
+ data_list.append(dev_data)
+ data = {'data': data_list}
+ 'startIndex=0&endIndex=0', data=data,
+ timeout=self.request_timeout)
+ else:
+ self.log.debug('v2 Inventory API Call')
+ # Create a list of device IPs
+ device_ips = [dev['device_ip'] for dev in device_list]
+ # First add the devices to the inventory in a single call
+ data = {'hosts': device_ips}
+'/inventory/devices', data=data,
+ timeout=self.request_timeout)
+ # Get the inventory list
+ inv = self.get_inventory()
+ if wait:
+ # With v2, the devices can take a few moments to appear
+ # We need them present before we can move them to a container
+ timeout = time.time() + 600
+ while device_ips and time.time() < timeout:
+ inv_devices = [dev['ipAddress'] for dev in inv]
+ device_ips = list(set(device_ips) - set(inv_devices))
+ if device_ips:
+ time.sleep(2)
+ inv = self.get_inventory()
+ if device_ips:
+ # If any devices did not appear, there is a problem
+ # Join the missing IPs into a string for output
+ missing_ips = ', '.join(device_ips)
+ raise RuntimeError('Devices {} failed to appear '
+ 'in inventory'.format(missing_ips))
+ # Move the devices to their specified containers
+ for device in device_list:
+ devs = [dev for dev in inv if 'ipAddress' in dev and
+ device['device_ip'] in dev['ipAddress']]
+ dev = devs[0]
+ container = {'key': device['parent_key'],
+ 'name': device['parent_name']}
+ self.move_device_to_container('add_device_to_inventory API v2',
+ dev, container, False)
+ def add_device_to_inventory(self, device_ip, parent_name,
+ parent_key, wait=False):
+ ''' Add the device to the specified parent container.
+ Args:
+ device_ip (str): ip address of device we are adding
+ parent_name (str): Parent container name
+ parent_key (str): Parent container key
+ '''
+ # Put parameters into a dictionary and call add_devices_to_inventory
+ device = {
+ 'device_ip': device_ip,
+ 'parent_name': parent_name,
+ 'parent_key': parent_key
+ }
+ self.add_devices_to_inventory([device], wait=wait)
+ def retry_add_to_inventory(self, device_mac, device_ip, username,
+ password):
+ '''Retry addition of device to Cvp inventory
+ Args:
+ device_mac (str): MAC address of device
+ device_ip (str): ip address assigned to device
+ username (str): username for device login
+ password (str): password for user
+ '''
+ self.log.debug('retry_add_to_inventory: called')
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion == 1.0:
+ self.log.debug('v1 Inventory API Call')
+ data = {"key": device_mac,
+ "ipAddress": device_ip,
+ "userName": username,
+ "password": password}
+ 'startIndex=0&endIndex=0',
+ data=data,
+ timeout=self.request_timeout)
+ else:
+ self.log.debug('v2 Inventory API Call')
+ self.log.warning(
+ 'retry_add_to_inventory: not implemented for v2 APIs')
+ def delete_device(self, device_mac):
+ '''Delete the device and its pending tasks from Cvp inventory
+ Args:
+ device_mac (str): mac address of device we are deleting
+ For CVP 2020 this param is now required to
+ be the device serial number instead of MAC
+ address. This method will handle getting
+ the device serial number via the provided
+ MAC address.
+ Returns:
+ data (dict): Contains success or failure message
+ '''
+ self.log.debug('delete_device: called')
+ return self.delete_devices([device_mac])
+ def delete_devices(self, device_macs):
+ '''Delete the device and its pending tasks from Cvp inventory
+ Args:
+ device_macs (list): list of mac address for
+ devices we're deleting
+ For CVP 2020 this param is now required to
+ be a list of device serial numbers instead
+ of MAC addresses. This method will handle
+ getting the device serial number via the
+ provided MAC address.
+ Returns:
+ data (dict): Contains success or failure message
+ '''
+ self.log.debug('delete_devices: called')
+ resp = None
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion < 4.0:
+ data = {'data': device_macs}
+ resp ='/inventory/', data=data,
+ timeout=self.request_timeout)
+ else:
+ self.log.warning('NOTE: The Delete Devices API has updated for'
+ ' CVP 2020.2 and it is not required to send the'
+ ' device serial number instead of mac address'
+ ' when deleting a device. Looking up each devices'
+ 'serial num based on provided MAC addresses')
+ devices = []
+ for dev_mac in device_macs:
+ device_info = self.get_device_by_mac(dev_mac)
+ if device_info is not None and 'serialNumber' in device_info:
+ devices.append(device_info)
+ resp = self.delete_devices_by_serial(devices)
+ return resp
+ def delete_devices_by_serial(self, devices):
+ '''Delete the device and its pending tasks from Cvp inventory
+ Args:
+ devices (list): list of device objects to be deleted
+ Returns:
+ data (dict): Contains success or failure message
+ '''
+ device_serials = []
+ for device in devices:
+ device_serials.append(device['serialNumber'])
+ data = {'data': device_serials}
+ resp = self.clnt.delete('/inventory/devices', data=data,
+ timeout=self.request_timeout)
+ return resp
+ def get_non_connected_device_count(self):
+ '''Returns number of devices not accessible/connected in the temporary
+ inventory.
+ Returns:
+ data (int): Number of temporary inventory devices not
+ accessible/connected
+ '''
+ self.log.debug('get_non_connected_device_count: called')
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion == 1.0:
+ self.log.debug('v1 Inventory API Call')
+ data = self.clnt.get(
+ '/inventory/add/',
+ timeout=self.request_timeout)
+ return data['data']
+ self.log.debug('v2 Inventory API Call')
+ data = self.clnt.get('/inventory/devices?provisioned=false',
+ timeout=self.request_timeout)
+ unprovisioned_devs = 0
+ for dev in data:
+ if 'status' in dev and dev['status'] == '':
+ unprovisioned_devs += 1
+ return unprovisioned_devs
+ def save_inventory(self):
+ '''Saves Cvp inventory state
+ '''
+ self.log.debug('save_inventory: called')
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion == 1.0:
+ self.log.debug('v1 Inventory API Call')
+ return'/inventory/add/',
+ timeout=self.request_timeout)
+ self.log.debug('v2 Inventory API Call')
+ message = 'Save Inventory not implemented/necessary for' +\
+ ' CVP 2018.2 and beyond'
+ data = {'data': 0, 'message': message}
+ return data
+ def get_devices_in_container(self, name):
+ ''' Returns a dict of the devices under the named container.
+ Args:
+ name (str): The name of the container to get devices from
+ '''
+ self.log.debug('get_devices_in_container: called')
+ devices = []
+ container = self.get_container_by_name(name)
+ if container:
+ all_devices = self.get_inventory(0, 0, name)
+ for device in all_devices:
+ if device['parentContainerId'] == container['key']:
+ devices.append(device)
+ return devices
+ def get_device_by_name(self, fqdn, search_by_hostname=False):
+ ''' Returns the net element device dict for the devices fqdn name.
+ Args:
+ fqdn (str): Fully qualified domain name or hostname of the
+ device.
+ search_by_hostname (boolean): if set True will attempt to split
+ the fqdn string to match on the hostname portion
+ specifically which should be the first component
+ Returns:
+ device (dict): The net element device dict for the device if
+ otherwise returns an empty hash.
+ '''
+ self.log.debug('get_device_by_name: fqdn: %s' % fqdn)
+ # data = self.get_inventory(start=0, end=0, query=fqdn)
+ data = self.search_topology(fqdn)
+ device = {}
+ if 'netElementList' in data:
+ for netelem in data['netElementList']:
+ if not search_by_hostname:
+ if netelem['fqdn'] == fqdn:
+ device = netelem
+ break
+ else:
+ if netelem['fqdn'].split('.')[0] == fqdn:
+ device = netelem
+ break
+ return device
+ def get_device_by_mac(self, device_mac):
+ ''' Returns the net element device dict for the devices mac address.
+ Args:
+ device_mac (str): MAC Address of the device.
+ Returns:
+ device (dict): The net element device dict for the device if
+ otherwise returns an empty hash.
+ '''
+ self.log.debug('get_device_by_mac: MAC address: %s' % device_mac)
+ # data = self.get_inventory(start=0, end=0, query=device_mac)
+ data = self.search_topology(device_mac)
+ device = {}
+ if 'netElementList' in data:
+ for netelem in data['netElementList']:
+ if netelem['systemMacAddress'] == device_mac:
+ device = netelem
+ break
+ return device
+ def get_device_by_serial(self, device_serial):
+ ''' Returns the net element device dict for the devices serial number.
+ Args:
+ device_serial (str): Serial number of the device.
+ Returns:
+ device (dict): The net element device dict for the device if
+ otherwise returns an empty hash.
+ '''
+ self.log.debug('get_device_by_serial: Serial Number: %s'
+ % device_serial)
+ data = self.search_topology(device_serial)
+ device = {}
+ if 'netElementList' in data:
+ for netelem in data['netElementList']:
+ if netelem['serialNumber'] == device_serial:
+ device = netelem
+ break
+ return device
+ def get_device_configuration(self, device_mac):
+ ''' Returns the running configuration for the device provided.
+ Args:
+ device_mac (str): Mac address of the device to get the running
+ configuration for.
+ Returns:
+ device (dict): The net element device dict for the device if
+ otherwise returns an empty hash.
+ '''
+ self.log.debug('get_device_configuration: device_mac: %s' % device_mac)
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion < 4.0:
+ data = self.clnt.get('/inventory/'
+ 'netElementId=%s' % device_mac,
+ timeout=self.request_timeout)
+ else:
+ data = self.clnt.get('/inventory/device/config?'
+ 'netElementId=%s' % device_mac,
+ timeout=self.request_timeout)
+ running_config = ''
+ if 'output' in data:
+ running_config = data['output']
+ return running_config
+ def get_device_image_info(self, device_mac):
+ ''' Return a dict of info about a device in CVP.
+ Args:
+ device_mac (str): Mac address of the device to get the running
+ configuration for.
+ Returns:
+ device_image_info (dict): Dict of image info for the device
+ if found. Otherwise returns None.
+ '''
+ self.log.debug('Attempt to get net element data for %s' % device_mac)
+ try:
+ device_image_info = self.clnt.get(
+ '/provisioning/'
+ % qplus(device_mac), timeout=self.request_timeout)
+ except CvpApiError as error:
+ # Catch error when device for provided MAC is not found
+ if 'Invalid Netelement id' in str(error):
+ self.log.debug('Device with MAC %s not found' % device_mac)
+ return None
+ raise error
+ return device_image_info
+ def get_containers(self, start=0, end=0):
+ ''' Returns a list of all the containers.
+ Args:
+ start (int): Start index for the pagination. Default is 0.
+ end (int): End index for the pagination. If end index is 0
+ then all the records will be returned. Default is 0.
+ Returns:
+ containers (dict): The 'total' key contains the number of
+ containers, the 'data' key contains a list of the containers
+ with associated info.
+ '''
+ self.log.debug('Get list of containers')
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion == 1.0:
+ self.log.debug('v1 Inventory API Call')
+ return self.clnt.get('/inventory/add/'
+ 'startIndex=%d&endIndex=%d' % (start, end))
+ self.log.debug('v2 Inventory API Call')
+ containers = self.clnt.get('/inventory/containers')
+ for container in containers:
+ container['name'] = container['Name']
+ container['key'] = container['Key']
+ full_cont_info = self.get_container_by_id(
+ container['Key'])
+ if (full_cont_info is not None and
+ container['Key'] != 'root'):
+ container['parentName'] = full_cont_info['parentName']
+ for cont in containers:
+ if cont['Name'] == full_cont_info['parentName']:
+ container['parentId'] = cont['Key']
+ break
+ else:
+ self.log.debug(
+ 'No container parentId found for parentName %s',
+ full_cont_info['parentName'])
+ container['parentId'] = None
+ else:
+ container['parentName'] = None
+ container['parentId'] = None
+ container['type'] = None
+ container['id'] = 21
+ container['factoryId'] = 1
+ container['userId'] = None
+ container['childContainerId'] = None
+ return {'data': containers, 'total': len(containers)}
+ def get_container_by_name(self, name):
+ ''' Returns a container that exactly matches the name.
+ Args:
+ name (str): String to search for in container names.
+ Returns:
+ container (dict): Container info in dictionary format or None
+ '''
+ self.log.debug('Get info for container %s' % name)
+ conts = self.clnt.get('/provisioning/'
+ '&startIndex=0&endIndex=0' % qplus(name))
+ if conts['total'] > 0 and conts['containerList']:
+ for cont in conts['containerList']:
+ if cont['name'] == name:
+ return cont
+ return None
+ def get_container_by_id(self, key):
+ ''' Returns a container for the given id.
+ Args:
+ key (str): String ID for container to find.
+ Returns:
+ container (dict): Container info in dictionary format or None
+ '''
+ self.log.debug('Get info for container %s' % key)
+ return self.clnt.get('/provisioning/'
+ 'containerId=%s' % qplus(key))
+ def get_configlets_by_device_id(self, mac, start=0, end=0):
+ ''' Returns the list of configlets applied to a device.
+ Args:
+ mac (str): Device mac address (i.e. device id)
+ start (int): The first configlet entry to return. Default is 0
+ end (int): The last configlet entry to return. Default is 0
+ which means to return all configlet entries. Can be a
+ large number to indicate the last configlet entry.
+ Returns:
+ configlets (list): The list of configlets applied to the device
+ '''
+ self.log.debug('get_configlets_by_device: mac: %s' % mac)
+ data = self.get_configlets_by_netelement_id(mac, start, end)
+ return data['configletList']
+ def add_configlet_builder(self, name, config, draft=False, form=None):
+ ''' Add a confilget builder and return the key for the configlet builder.
+ Args:
+ name (str): Configlet builder name
+ config (str): Python configlet builder code
+ draft (bool): If builder is a draft
+ form (list): Array/list of form data
+ Parameters:
+ fieldId (str): "",
+ fieldLabel (str): "",
+ value (str): "",
+ type (str): "", (Options below)
+ ('Text box',
+ 'Text area',
+ 'Drop down',
+ 'Check box',
+ 'Radio button',
+ 'IP address',
+ 'Password')
+ validation: {
+ mandatory (boolean): true,
+ },
+ helpText (str): "",
+ depends (str): "",
+ dataValidationErrorExist (boolean): true,
+ dataValidation (string): ""
+ Returns:
+ key (str): The key for the configlet
+ '''
+ if not form:
+ form = []
+ self.log.debug('add_configlet_builder: name: %s config: %s form: %s'
+ % (name, config, form))
+ data = {'name': name,
+ 'data': {'formList': form,
+ 'main_script': {'data': config}}}
+ # Create the configlet builder
+'/configlet/' % draft,
+ data=data, timeout=self.request_timeout)
+ # Get the key for the configlet
+ data = self.clnt.get(
+ '/configlet/' % qplus(name),
+ timeout=self.request_timeout)
+ return data['key']
+ def add_configlet(self, name, config):
+ ''' Add a configlet and return the key for the configlet.
+ Args:
+ name (str): Configlet name
+ config (str): Switch config statements
+ Returns:
+ key (str): The key for the configlet
+ '''
+ self.log.debug('add_configlet: name: %s config: %s' % (name, config))
+ body = {'name': name, 'config': config}
+ # Create the configlet
+'/configlet/', data=body,
+ timeout=self.request_timeout)
+ # Get the key for the configlet
+ data = self.clnt.get('/configlet/'
+ % qplus(name), timeout=self.request_timeout)
+ return data['key']
+ def delete_configlet(self, name, key):
+ ''' Delete the configlet.
+ Args:
+ name (str): Configlet name
+ key (str): Configlet key
+ '''
+ self.log.debug('delete_configlet: name: %s key: %s' % (name, key))
+ body = [{'name': name, 'key': key}]
+ # Delete the configlet
+'/configlet/', data=body,
+ timeout=self.request_timeout)
+ def update_configlet(self, config, key, name, wait_task_ids=False):
+ ''' Update a configlet.
+ Args:
+ config (str): Switch config statements
+ key (str): Configlet key
+ name (str): Configlet name
+ wait_task_ids (boolean): Wait for task IDs to generate
+ Returns:
+ data (dict): Contains success or failure message
+ '''
+ self.log.debug('update_configlet: config: %s key: %s name: %s' %
+ (config, key, name))
+ # Update the configlet
+ body = {'config': config, 'key': key, 'name': name,
+ 'waitForTaskIds': wait_task_ids}
+ return'/configlet/', data=body,
+ timeout=self.request_timeout)
+ def update_configlet_builder(self, name, key, config, draft=False,
+ wait_for_task=False, form=None):
+ ''' Update an existing configlet builder.
+ Args:
+ config (str): Contents of the configlet builder configuration
+ key: (str): key/id of the configlet builder to be updated
+ name: (str): name of the configlet builder
+ draft (boolean): is update a draft
+ wait_for_task (boolean): wait for task IDs to be generated
+ form (list): Array/list of form data
+ Parameters:
+ fieldId (str): "",
+ fieldLabel (str): "",
+ value (str): "",
+ type (str): "", (Options below)
+ ('Text box',
+ 'Text area',
+ 'Drop down',
+ 'Check box',
+ 'Radio button',
+ 'IP address',
+ 'Password')
+ validation: {
+ mandatory (boolean): true,
+ },
+ helpText (str): "",
+ depends (str): "",
+ dataValidationErrorExist (boolean): true,
+ dataValidation (string): ""
+ '''
+ if not form:
+ form = []
+ data = {
+ "name": name,
+ "waitForTaskIds": wait_for_task,
+ "data": {
+ "formList": form,
+ "main_script": {
+ "data": config
+ }
+ }
+ }
+ debug_str = 'update_configlet_builder:' \
+ ' config: {} key: {} name: {} form: {}'
+ self.log.debug(debug_str.format(config, key, name, form))
+ # Update the configlet builder
+ url_string = '/configlet/' \
+ 'isDraft={}&id={}&action=save'
+ return, key),
+ data=data, timeout=self.request_timeout)
+ def update_reconcile_configlet(self, device_mac, config, key, name,
+ reconciled=False):
+ ''' Update the reconcile configlet.
+ Args:
+ device_mac (str): Mac address/Key for device whose reconcile
+ configlet is being updated
+ config (str): Reconciled config statements
+ key (str): Reconcile Configlet key
+ name (str): Reconcile Configlet name
+ reconciled (boolean): Wait for task IDs to generate
+ Returns:
+ data (dict): Contains success or failure message
+ '''
+ log_str = ('update_reconcile_configlet:'
+ ' device_mac: {} config: {} key: {} name: {}')
+ self.log.debug(log_str.format(device_mac, config, key, name))
+ url_str = ('/provisioning/'
+ 'netElementId={}')
+ body = {
+ 'config': config,
+ 'key': key,
+ 'name': name,
+ 'reconciled': reconciled,
+ 'unCheckedLines': '',
+ }
+ return, data=body,
+ timeout=self.request_timeout)
+ def add_note_to_configlet(self, key, note):
+ ''' Add a note to a configlet.
+ Args:
+ key (str): Configlet key
+ note (str): Note to be added to configlet.
+ '''
+ data = {
+ 'key': key,
+ 'note': note,
+ }
+ return'/configlet/',
+ data=data, timeout=self.request_timeout)
+ def sanitize_warnings(self, data):
+ ''' Sanitize the warnings returned after validation.
+ In some cases where the configlets has both errors
+ and warnings, CVP may split any warnings that have
+ `,` across multiple strings.
+ This method concats the strings back into one string
+ per warning, and correct the warningCount.
+ Args:
+ data (dict): A dict that contians the result
+ of the validation operation
+ Returns:
+ response (dict): A dict that contains the result of the
+ validation operation
+ '''
+ if "warnings" not in data:
+ # nothing to do here, we can return as is
+ return data
+ # Since there may be warnings incorrectly split on
+ # ', ' within the warning text by CVP, we join all the
+ # warnings together using ', ' into one large string
+ temp_warnings = ", ".join(data['warnings']).strip()
+ # To split the large string again we match on the
+ # 'at line XXX' that should indicate the end of the warning.
+ # We capture as well the remaining \\n or whitespace and include
+ # the extra ', ' added in the previous step in the matching criteria.
+ # The extra ', ' is not included in the strings of the new list
+ temp_warnings = split(
+ r'(.*?at line \d+.*?),\s+',
+ temp_warnings
+ )
+ # The behaviour of re.split will add empty strings
+ # if the regex matches on the begging or ending of the line.
+ # Refer to
+ # Use filter to remove any empty strings
+ # that re.split inserted
+ data['warnings'] = list(filter(None, temp_warnings))
+ # Update the count of warnings to the correct value
+ data['warningCount'] = len(data['warnings'])
+ return data
+ def validate_config_for_device(self, device_mac, config):
+ ''' Validate a config against a device
+ Args:
+ device_mac (str): Device MAC address
+ config (str): Switch config statements
+ Returns:
+ response (dict): A dict that contains the result of the
+ validation operation
+ '''
+ self.log.debug('validate_config_for_device: device_mac: %s config: %s'
+ % (device_mac, config))
+ body = {'netElementId': device_mac, 'config': config}
+ return self.sanitize_warnings(
+ '/configlet/',
+ data=body,
+ timeout=self.request_timeout
+ )
+ )
+ def validate_config(self, device_mac, config):
+ ''' Validate a config against a device and parse response to
+ produce log messages are return a flag for the config validity.
+ Args:
+ device_mac (str): Device MAC address
+ config (str): Switch config statements
+ Returns:
+ response (boolean): A flag signifying if the config is valid or
+ not.
+ '''
+ self.log.debug('validate_config: device_mac: %s config: %s'
+ % (device_mac, config))
+ result = self.validate_config_for_device(device_mac, config)
+ validated = True
+ if 'warningCount' in result and result['warnings']:
+ for warning in result['warnings']:
+ self.log.warning('Validation of config produced warning - %s'
+ % warning)
+ if 'errorCount' in result:
+ self.log.error('Validation of config produced %s errors'
+ % result['errorCount'])
+ if 'errors' in result:
+ for error in result['errors']:
+ self.log.error('Validation of config produced error - %s'
+ % error)
+ validated = False
+ if 'result' in result:
+ for item in result['result']:
+ if 'messages' in item:
+ for message in item['messages']:
+'Validation of config returned'
+ ' message - %s' % message)
+ return validated
+ def get_all_temp_actions(self, start=0, end=0):
+ ''' Returns a list of existing temp actions.
+ Args:
+ start (int): Start index for the pagination. Default is 0.
+ end (int): End index for the pagination. If end index is 0
+ then all the records will be returned. Default is 0.
+ Returns:
+ response (dict): A dict that contains a list of the current
+ temp actions.
+ '''
+ url = ('/provisioning/'
+ % (start, end))
+ data = self.clnt.get(url, timeout=self.request_timeout)
+ return data
+ def _add_temp_action(self, data):
+ ''' Adds temp action that requires a saveTopology call to take effect.
+ Args:
+ data (dict): a data dict with a specific format for the
+ desired action.
+ Base Ex: data = {'data': [{specific key/value pairs}]}
+ '''
+ url = ('/provisioning/'
+ 'format=topology&queryParam=&nodeId=root')
+, data=data, timeout=self.request_timeout)
+ def _save_topology_v2(self, data):
+ ''' Confirms a previously created temp action.
+ Args:
+ data (list): a list that contains a dict with a specific
+ format for the desired action. Our primary use case is for
+ confirming existing temp actions so we most often send an
+ empty list to confirm an existing temp action.
+ Returns:
+ response (dict): A dict that contains a status and a list of
+ task ids created (if any).
+ Ex: {u'data': {u'status': u'success', u'taskIds': []}}
+ '''
+ url = '/provisioning/v2/'
+ return, data=data, timeout=self.request_timeout)
+ def apply_configlets_to_device(self, app_name, dev, new_configlets,
+ create_task=True, reorder_configlets=False):
+ ''' Apply the configlets to the device.
+ Args:
+ app_name (str): The application name to use in info field.
+ dev (dict): The switch device dict
+ new_configlets (list): List of configlet name and key pairs
+ create_task (bool): Determines whether or not to execute a save
+ and create the tasks (if any)
+ reorder_configlets (bool): Defaults to False. To use this
+ parameter you must first get the full list of configlets
+ applied to the device (for example via the
+ get_configlets_by_device_id function) and provide the
+ full list of configlets (in addition to any new configlets
+ being applied) in the desired order as the new_configlets
+ parameter. It is also important to keep in mind configlets
+ that are applied to parent containers because they will
+ be applied before configlets applied to the device
+ directly. Set this parameter to True only with the full
+ list of configlets being applied to the device provided
+ via the new_configlets parameter.
+ Returns:
+ response (dict): A dict that contains a status and a list of
+ task ids created (if any).
+ Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}}
+ '''
+ self.log.debug('apply_configlets_to_device: dev: %s names: %s' %
+ (dev, new_configlets))
+ # Get a list of the names and keys of the configlets
+ cnames = []
+ ckeys = []
+ if not reorder_configlets:
+ # Get all the configlets assigned to the device.
+ configlets = self.get_configlets_by_device_id(
+ dev['systemMacAddress'])
+ for configlet in configlets:
+ cnames.append(configlet['name'])
+ ckeys.append(configlet['key'])
+ # Add the new configlets to the end of the arrays
+ for entry in new_configlets:
+ cnames.append(entry['name'])
+ ckeys.append(entry['key'])
+ info = '%s: Configlet Assign: to Device %s' % (app_name, dev['fqdn'])
+ info_preview = '<b>Configlet Assign:</b> to Device' + dev['fqdn']
+ data = {'data': [{'info': info,
+ 'infoPreview': info_preview,
+ 'note': '',
+ 'action': 'associate',
+ 'nodeType': 'configlet',
+ 'nodeId': '',
+ 'configletList': ckeys,
+ 'configletNamesList': cnames,
+ 'ignoreConfigletNamesList': [],
+ 'ignoreConfigletList': [],
+ 'configletBuilderList': [],
+ 'configletBuilderNamesList': [],
+ 'ignoreConfigletBuilderList': [],
+ 'ignoreConfigletBuilderNamesList': [],
+ 'toId': dev['systemMacAddress'],
+ 'toIdType': 'netelement',
+ 'fromId': '',
+ 'nodeName': '',
+ 'fromName': '',
+ 'toName': dev['fqdn'],
+ 'nodeIpAddress': dev['ipAddress'],
+ 'nodeTargetIpAddress': dev['ipAddress'],
+ 'childTasks': [],
+ 'parentTask': ''}]}
+ self.log.debug('apply_configlets_to_device: saveTopology data:\n%s' %
+ data['data'])
+ self._add_temp_action(data)
+ if create_task:
+ return self._save_topology_v2([])
+ return None
+ # pylint: disable=too-many-locals
+ def remove_configlets_from_device(self, app_name, dev, del_configlets,
+ create_task=True):
+ ''' Remove the configlets from the device.
+ Args:
+ app_name (str): The application name to use in info field.
+ dev (dict): The switch device dict
+ del_configlets (list): List of configlet name and key pairs
+ create_task (bool): Determines whether or not to execute a save
+ and create the tasks (if any)
+ Returns:
+ response (dict): A dict that contains a status and a list of
+ task ids created (if any).
+ Ex: {u'data': {u'status': u'success', u'taskIds': [u'35']}}
+ '''
+ self.log.debug('remove_configlets_from_device: dev: %s names: %s' %
+ (dev, del_configlets))
+ # Get all the configlets assigned to the device.
+ configlets = self.get_configlets_by_device_id(dev['systemMacAddress'])
+ # Get a list of the names and keys of the configlets. Do not add
+ # configlets that are on the delete list.
+ keep_names = []
+ keep_keys = []
+ for configlet in configlets:
+ key = configlet['key']
+ if next((ent for ent in del_configlets if ent['key'] == key),
+ None) is None:
+ keep_names.append(configlet['name'])
+ keep_keys.append(key)
+ # Remove the names and keys of the configlets to keep and build a
+ # list of the configlets to remove.
+ del_names = []
+ del_keys = []
+ for entry in del_configlets:
+ del_names.append(entry['name'])
+ del_keys.append(entry['key'])
+ info = '%s Configlet Remove: from Device %s' % (app_name, dev['fqdn'])
+ info_preview = '<b>Configlet Remove:</b> from Device' + dev['fqdn']
+ data = {'data': [{'info': info,
+ 'infoPreview': info_preview,
+ 'note': '',
+ 'action': 'associate',
+ 'nodeType': 'configlet',
+ 'nodeId': '',
+ 'configletList': keep_keys,
+ 'configletNamesList': keep_names,
+ 'ignoreConfigletNamesList': del_names,
+ 'ignoreConfigletList': del_keys,
+ 'configletBuilderList': [],
+ 'configletBuilderNamesList': [],
+ 'ignoreConfigletBuilderList': [],
+ 'ignoreConfigletBuilderNamesList': [],
+ 'toId': dev['systemMacAddress'],
+ 'toIdType': 'netelement',
+ 'fromId': '',
+ 'nodeName': '',
+ 'fromName': '',
+ 'toName': dev['fqdn'],
+ 'nodeIpAddress': dev['ipAddress'],
+ 'nodeTargetIpAddress': dev['ipAddress'],
+ 'childTasks': [],
+ 'parentTask': ''}]}
+ self.log.debug('remove_configlets_from_device: saveTopology data:\n%s'
+ % data['data'])
+ self._add_temp_action(data)
+ if create_task:
+ return self._save_topology_v2([])
+ return None
+ def apply_configlets_to_container(self, app_name, container,
+ new_configlets, create_task=True):
+ ''' Apply the configlets to the container.
+ Args:
+ app_name (str): The application name to use in info field.
+ container (dict): The container dict
+ new_configlets (list): List of configlet name and key pairs
+ create_task (bool): Determines whether or not to execute a save
+ and create the tasks (if any)
+ Returns:
+ response (dict): A dict that contains a status and a list of
+ task ids created (if any).
+ Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}}
+ '''
+ self.log.debug(
+ 'apply_configlets_to_container: container: %s names: %s' %
+ (container, new_configlets))
+ # Get all the configlets assigned to the device.
+ configlets = self.get_configlets_by_container_id(container['key'])
+ # Get a list of the names and keys of the configlets
+ # Static Configlets
+ cnames = []
+ ckeys = []
+ # ConfigletBuilder Configlets
+ bnames = []
+ bkeys = []
+ if configlets['configletList']:
+ for configlet in configlets['configletList']:
+ if configlet['type'] == 'Static':
+ cnames.append(configlet['name'])
+ ckeys.append(configlet['key'])
+ elif configlet['type'] == 'Builder':
+ bnames.append(configlet['name'])
+ bkeys.append(configlet['key'])
+ # Add the new configlets to the end of the arrays
+ for entry in new_configlets:
+ cnames.append(entry['name'])
+ ckeys.append(entry['key'])
+ info = '%s: Configlet Assign: to Container %s' % (app_name,
+ container['name'])
+ info_preview = '<b>Configlet Assign:</b> to Container' + container[
+ 'name']
+ data = {'data': [{'info': info,
+ 'infoPreview': info_preview,
+ 'note': '',
+ 'action': 'associate',
+ 'nodeType': 'configlet',
+ 'nodeId': '',
+ 'configletList': ckeys,
+ 'configletNamesList': cnames,
+ 'ignoreConfigletNamesList': [],
+ 'ignoreConfigletList': [],
+ 'configletBuilderList': bkeys,
+ 'configletBuilderNamesList': bnames,
+ 'ignoreConfigletBuilderList': [],
+ 'ignoreConfigletBuilderNamesList': [],
+ 'toId': container['key'],
+ 'toIdType': 'container',
+ 'fromId': '',
+ 'nodeName': '',
+ 'fromName': '',
+ 'toName': container['name'],
+ 'nodeIpAddress': '',
+ 'nodeTargetIpAddress': '',
+ 'childTasks': [],
+ 'parentTask': ''}]}
+ self.log.debug(
+ 'apply_configlets_to_container: saveTopology data:\n%s' %
+ data['data'])
+ self._add_temp_action(data)
+ if create_task:
+ return self._save_topology_v2([])
+ return data
+ # pylint: disable=too-many-locals
+ # pylint: disable=invalid-name
+ def remove_configlets_from_container(self, app_name, container,
+ del_configlets, create_task=True):
+ ''' Remove the configlets from the container.
+ Args:
+ app_name (str): The application name to use in info field.
+ container (dict): The container dict
+ del_configlets (list): List of configlet name and key pairs
+ create_task (bool): Determines whether or not to execute a save
+ and create the tasks (if any)
+ Returns:
+ response (dict): A dict that contains a status and a list of
+ task ids created (if any).
+ Ex: {u'data': {u'status': u'success', u'taskIds': [u'35']}}
+ '''
+ self.log.debug(
+ 'remove_configlets_from_container: container: %s names: %s' %
+ (container, del_configlets))
+ # Get all the configlets assigned to the device.
+ configlets = self.get_configlets_by_container_id(container['key'])
+ # Get a list of the names and keys of the configlets. Do not add
+ # configlets that are on the delete list.
+ keep_names = []
+ keep_keys = []
+ for configlet in configlets['configletList']:
+ key = configlet['key']
+ if next((ent for ent in del_configlets if ent['key'] == key),
+ None) is None:
+ keep_names.append(configlet['name'])
+ keep_keys.append(key)
+ # Remove the names and keys of the configlets to keep and build a
+ # list of the configlets to remove.
+ del_names = []
+ del_keys = []
+ for entry in del_configlets:
+ del_names.append(entry['name'])
+ del_keys.append(entry['key'])
+ info = '%s Configlet Remove: from Container %s' % (app_name,
+ container['name'])
+ info_preview = '<b>Configlet Remove:</b> from Container' + container[
+ 'name']
+ data = {'data': [{'info': info,
+ 'infoPreview': info_preview,
+ 'note': '',
+ 'action': 'associate',
+ 'nodeType': 'configlet',
+ 'nodeId': '',
+ 'configletList': keep_keys,
+ 'configletNamesList': keep_names,
+ 'ignoreConfigletNamesList': del_names,
+ 'ignoreConfigletList': del_keys,
+ 'configletBuilderList': [],
+ 'configletBuilderNamesList': [],
+ 'ignoreConfigletBuilderList': [],
+ 'ignoreConfigletBuilderNamesList': [],
+ 'toId': container['key'],
+ 'toIdType': 'container',
+ 'fromId': '',
+ 'nodeName': '',
+ 'fromName': '',
+ 'toName': container['name'],
+ 'nodeIpAddress': '',
+ 'nodeTargetIpAddress': '',
+ 'childTasks': [],
+ 'parentTask': ''}]}
+ self.log.debug(
+ 'remove_configlets_from_container: saveTopology data:\n%s'
+ % data['data'])
+ self._add_temp_action(data)
+ if create_task:
+ return self._save_topology_v2([])
+ return data
+ def validate_configlets_for_device(self, mac, configlet_keys,
+ page_type='viewConfig'):
+ ''' Validate and compare configlets for device.
+ Args:
+ mac (str): MAC address of device to validate configlets for.
+ configlet_keys (list): List of configlet keys
+ page_type (list): Possible Values of pageType - 'viewConfig',
+ 'managementIPValidation', 'validateConfig', etc...
+ Returns:
+ response (dict): A dict that contains ...
+ Ex: {"reconciledConfig": {...},
+ "reconcile": 0,
+ "new": 0,
+ "designedConfig": [{...}],
+ "total": 0,
+ "runningConfig": [{...}],
+ "isReconcileInvoked": true,
+ "mismatch": 0,
+ "warnings": [""],
+ "errors": [{"configletLineNo": 0,
+ "error": "string",
+ "configletId": "string"}, ...]
+ }
+ '''
+ self.log.debug('validate_configlets_for_device: '
+ 'MAC: %s - conf keys: %s - page_type: %s' %
+ (mac, configlet_keys, page_type))
+ data = {'configIdList': configlet_keys,
+ 'netElementId': mac,
+ 'pageType': page_type}
+ return
+ '/provisioning/v2/',
+ data=data, timeout=self.request_timeout)
+ def get_applied_devices(self, configlet_name, start=0, end=0):
+ ''' Returns a list of devices to which the named configlet is applied.
+ Args:
+ configlet_name (str): The name of the configlet to be queried.
+ start (int): Start index for the pagination. Default is 0.
+ end (int): End index for the pagination. If end index is 0
+ then all the records will be returned. Default is 0.
+ '''
+ return self.clnt.get('/configlet/'
+ 'configletName=%s&startIndex=%d&endIndex=%d'
+ % (configlet_name, start, end),
+ timeout=self.request_timeout)
+ def get_applied_containers(self, configlet_name, start=0, end=0):
+ ''' Returns a list of containers to which the named
+ configlet is applied.
+ Args:
+ configlet_name (str): The name of the configlet to be queried.
+ start (int): Start index for the pagination. Default is 0.
+ end (int): End index for the pagination. If end index is 0
+ then all the records will be returned. Default is 0.
+ '''
+ return self.clnt.get('/configlet/'
+ 'configletName=%s&startIndex=%d&endIndex=%d'
+ % (configlet_name, start, end),
+ timeout=self.request_timeout)
+ # pylint: disable=too-many-arguments
+ def _container_op(self, container_name, container_key, parent_name,
+ parent_key, operation):
+ ''' Perform the operation on the container.
+ Args:
+ container_name (str): Container name
+ container_key (str): Container key, can be empty for add.
+ parent_name (str): Parent container name
+ parent_key (str): Parent container key
+ operation (str): Container operation 'add' or 'delete'.
+ Returns:
+ response (dict): A dict that contains a status and a list of
+ task ids created (if any).
+ Ex: {u'data': {u'status': u'success', u'taskIds': []}}
+ '''
+ msg = ('%s container %s under container %s' %
+ (operation, container_name, parent_name))
+ data = {'data': [{'info': msg,
+ 'infoPreview': msg,
+ 'action': operation,
+ 'nodeType': 'container',
+ 'nodeId': container_key,
+ 'toId': '',
+ 'fromId': '',
+ 'nodeName': container_name,
+ 'fromName': '',
+ 'toName': '',
+ 'childTasks': [],
+ 'parentTask': '',
+ 'toIdType': 'container'}]}
+ if operation == 'add':
+ data['data'][0]['toId'] = parent_key
+ data['data'][0]['toName'] = parent_name
+ elif operation == 'delete':
+ data['data'][0]['fromId'] = parent_key
+ data['data'][0]['fromName'] = parent_name
+ # Perform the container operation
+ self._add_temp_action(data)
+ return self._save_topology_v2([])
+ def add_container(self, container_name, parent_name, parent_key):
+ ''' Add the container to the specified parent.
+ Args:
+ container_name (str): Container name
+ parent_name (str): Parent container name
+ parent_key (str): Parent container key
+ Returns:
+ response (dict): A dict that contains a status and a list of
+ task ids created (if any).
+ Ex: {u'data': {u'status': u'success', u'taskIds': []}}
+ '''
+ self.log.debug('add_container: container: %s parent: %s parent_key: %s'
+ % (container_name, parent_name, parent_key))
+ return self._container_op(container_name, 'new_container', parent_name,
+ parent_key, 'add')
+ def delete_container(self, container_name, container_key, parent_name,
+ parent_key):
+ ''' Add the container to the specified parent.
+ Args:
+ container_name (str): Container name
+ container_key (str): Container key
+ parent_name (str): Parent container name
+ parent_key (str): Parent container key
+ Returns:
+ response (dict): A dict that contains a status and a list of
+ task ids created (if any).
+ Ex: {u'data': {u'status': u'success', u'taskIds': []}}
+ '''
+ self.log.debug('delete_container: container: %s container_key: %s '
+ 'parent: %s parent_key: %s' %
+ (container_name, container_key, parent_name,
+ parent_key))
+ resp = self._container_op(container_name, container_key, parent_name,
+ parent_key, 'delete')
+ # As of CVP version 2020.1 the API endpoint stopped
+ # raising an Error when attempting to delete a container with children.
+ # To account for this try to see if the container being deleted
+ # still exists after the attempted delete. If it still exists
+ # raise an error similar to how CVP behaved prior to CVP 2020.1
+ try:
+ still_exists = self.get_container_by_id(container_key)
+ except CvpApiError as error:
+ if 'Invalid Container id' in error.msg:
+ return resp
+ else:
+ raise
+ if still_exists is not None:
+ raise CvpApiError('Container was not deleted. Check for children')
+ return resp
+ def get_parent_container_for_device(self, device_mac):
+ ''' Add the container to the specified parent.
+ Args:
+ device_mac (str): Device mac address
+ Returns:
+ response (dict): A dict that contains the parent container info
+ '''
+ self.log.debug('get_parent_container_for_device: called for %s'
+ % device_mac)
+ data = self.clnt.get('/provisioning/'
+ 'queryParam=%s&startIndex=0&endIndex=0'
+ % device_mac, timeout=self.request_timeout)
+ if data['total'] > 0:
+ cont_name = data['netElementContainerList'][0]['containerName']
+ return self.get_container_by_name(cont_name)
+ return None
+ def move_device_to_container(self, app_name, device, container,
+ create_task=True):
+ ''' Add the container to the specified parent.
+ Args:
+ app_name (str): String to specify info/signifier of calling app
+ device (dict): Device info
+ container (dict): Container info
+ create_task (bool): Determines whether or not to execute a save
+ and create the tasks (if any)
+ Returns:
+ response (dict): A dict that contains a status and a list of
+ task ids created (if any).
+ Ex: {u'data': {u'status': u'success', u'taskIds': []}}
+ '''
+ info = 'Device Add {} to container {} by {}'.format(device['fqdn'],
+ container['name'],
+ app_name)
+ self.log.debug('Attempting to move device %s to container %s'
+ % (device['fqdn'], container['name']))
+ if 'parentContainerId' in device:
+ from_id = device['parentContainerId']
+ else:
+ parent_cont = self.get_parent_container_for_device(device['key'])
+ from_id = parent_cont['key']
+ data = {'data': [{'info': info,
+ 'infoPreview': info,
+ 'action': 'update',
+ 'nodeType': 'netelement',
+ 'nodeId': device['key'],
+ 'toId': container['key'],
+ 'fromId': from_id,
+ 'nodeName': device['fqdn'],
+ 'toName': container['name'],
+ 'toIdType': 'container',
+ 'childTasks': [],
+ 'parentTask': ''}]}
+ try:
+ self._add_temp_action(data)
+ # pylint: disable=invalid-name
+ except CvpApiError as e:
+ if 'Data already exists' in str(e):
+ self.log.debug('Device %s already in container %s'
+ % (device['fqdn'], container))
+ if create_task:
+ return self._save_topology_v2([])
+ return None
+ def search_topology(self, query, start=0, end=0):
+ ''' Search the topology for items matching the query parameter.
+ Args:
+ query (str): Query parameter which is the name of the container
+ or device.
+ start (int): Start index for the pagination. Default is 0.
+ end (int): End index for the pagination. If end index is 0
+ then all the records will be returned. Default is 0.
+ Returns:
+ response (dict): A dict that contains the container and
+ netelement lists.
+ '''
+ self.log.debug('search_topology: query: %s start: %d end: %d' %
+ (query, start, end))
+ data = self.clnt.get('/provisioning/'
+ 'startIndex=%d&endIndex=%d'
+ % (qplus(query), start, end),
+ timeout=self.request_timeout)
+ if 'netElementList' in data:
+ for device in data['netElementList']:
+ device['status'] = device['deviceStatus']
+ device['mlagEnabled'] = device['isMLAGEnabled']
+ device['danzEnabled'] = device['isDANZEnabled']
+ device['parentContainerKey'] = device['parentContainerId']
+ device['bootupTimestamp'] = device['bootupTimeStamp']
+ device['internalBuild'] = device['internalBuildId']
+ return data
+ def filter_topology(self, node_id='root', fmt='topology',
+ start=0, end=0):
+ ''' Filter the CVP topology for container and device information.
+ Args:
+ node_id (str): The container key to base the filter in.
+ Default is 'root', for the Tenant container.
+ fmt (str): The type of filter to return. Must be either
+ 'topology' or 'list'. Default is 'topology'.
+ start (int): Start index for the pagination. Default is 0.
+ end (int): End index for the pagination. If end index is 0
+ then all the records will be returned. Default is 0.
+ '''
+ url = ('/provisioning/'
+ 'format=%s&startIndex=%d&endIndex=%d'
+ % (node_id, fmt, start, end))
+ return self.clnt.get(url, timeout=self.request_timeout)
+ def check_compliance(self, node_key, node_type):
+ ''' Check that a device is in compliance, that is the configlets
+ applied to the device match the devices running configuration.
+ Args:
+ node_key (str): The device key. This is the device MAC address
+ Example: ff:ff:ff:ff:ff:ff
+ node_type (str): The device type. This is either 'netelement'
+ or 'container'
+ Returns:
+ response (dict): A dict that contains the results of the
+ compliance check.
+ '''
+ self.log.debug('check_compliance: node_key: %s node_type: %s' %
+ (node_key, node_type))
+ data = {'nodeId': node_key, 'nodeType': node_type}
+ resp ='/provisioning/', data=data,
+ timeout=self.request_timeout)
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion >= 2.0:
+ if resp['complianceIndication'] == u'':
+ resp['complianceIndication'] = 'NONE'
+ return resp
+ def get_event_by_id(self, e_id):
+ ''' Return information on the requested event ID.
+ Args:
+ e_id (str): The event id to be queried.
+ '''
+ return self.clnt.get('/event/' % e_id,
+ timeout=self.request_timeout)
+ def get_default_snapshot_template(self):
+ ''' Return the default snapshot template.
+ '''
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion == 1.0:
+ self.log.debug('v1 Inventory API Call')
+ url = ('/snapshot/'
+ 'startIndex=0&endIndex=0')
+ return self.clnt.get(url, timeout=self.request_timeout)
+ self.log.debug('v2 Inventory API Call')
+ self.log.debug('API'
+ ' deprecated for CVP 2018.2 and beyond')
+ return None
+ # pylint: disable=invalid-name
+ def capture_container_level_snapshot(self, template_key, container_key):
+ ''' Initialize a container level snapshot event.
+ Args:
+ template_key (str): The snapshot template key to be used for
+ the snapshots.
+ container_key (str): The container key to start the
+ snapshots on.
+ '''
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion == 1.0:
+ self.log.debug('v1 Inventory API Call')
+ data = {
+ 'templateId': template_key,
+ 'containerId': container_key,
+ }
+ return'/snapshot/',
+ data=data, timeout=self.request_timeout)
+ self.log.debug('v2 Inventory API Call')
+ self.log.debug('API'
+ ' deprecated for CVP 2018.2 and beyond')
+ return None
+ def add_image(self, filepath):
+ ''' Add an image to a CVP cluster.
+ Args:
+ filepath (str): Local path to the image to upload.
+ Returns:
+ data (dict): Dictionary of image add data.
+ '''
+ # Get the absolute file path to be uploaded
+ image_path = os.path.abspath(filepath)
+ image_data = open(image_path, 'rb')
+ response ='/image/',
+ files={'file': image_data})
+ return response
+ def cancel_image(self, image_name):
+ ''' Discard/cancel the uploaded image/image bundle before save.
+ Args:
+ image_name (string): Name of image to cancel/discard.
+ Returns:
+ data (dict): Success or error message.
+ '''
+ image_data = {'data': image_name}
+ return'/image/', data=image_data,
+ timeout=self.request_timeout)
+ def get_images(self, start=0, end=0):
+ ''' Return a list of all images.
+ Args:
+ start (int): Start index for the pagination. Default is 0.
+ end (int): End index for the pagination. If end index is 0
+ then all the records will be returned. Default is 0.
+ Returns:
+ images (dict): The 'total' key contains the number of images,
+ the 'data' key contains a list of images and their info.
+ '''
+ self.log.debug('Get info about images')
+ return self.clnt.get('/image/'
+ 'endIndex=%d' % (start, end),
+ timeout=self.request_timeout)
+ def get_image_bundles(self, start=0, end=0):
+ ''' Return a list of all image bundles.
+ Args:
+ start (int): Start index for the pagination. Default is 0.
+ end (int): End index for the pagination. If end index is 0
+ then all the records will be returned. Default is 0.
+ Returns:
+ image bundles (dict): The 'total' key contains the number of
+ image bundles, the 'data' key contains a list of image
+ bundles and their info.
+ '''
+ self.log.debug('Get image bundles that can be applied to devices or'
+ ' containers')
+ return self.clnt.get('/image/'
+ 'startIndex=%d&endIndex=%d' % (start, end),
+ timeout=self.request_timeout)
+ def get_image_bundle_by_name(self, name):
+ ''' Return a dict of info about an image bundle.
+ Args:
+ name (str): Name of image bundle to return info about.
+ Returns:
+ image bundle (dict): Dict of info specific to the image bundle
+ requested or None if the name requested doesn't exist.
+ '''
+ self.log.debug('Attempt to get image bundle %s' % name)
+ try:
+ image = self.clnt.get('/image/'
+ % qplus(name), timeout=self.request_timeout)
+ except CvpApiError as error:
+ # Catch an invalid task_id error and return None
+ if 'Entity does not exist' in str(error):
+ self.log.debug('Bundle with name %s does not exist' % name)
+ return None
+ raise error
+ return image
+ def delete_image_bundle(self, image_key, image_name):
+ ''' Delete image bundle
+ Args:
+ image_key (str): The key of the image bundle to be deleted.
+ image_name (str): The name of the image bundle to be deleted.
+ '''
+ bundle_data = {
+ 'data': [{'key': image_key,
+ 'name': image_name}]
+ }
+ return'/image/', data=bundle_data,
+ timeout=self.request_timeout)
+ def save_image_bundle(self, name, images, certified=True):
+ ''' Save an image bundle to a cluster.
+ Args:
+ name (str): The name of the image bundle to be saved.
+ images (list): A list of image names to include in the bundle.
+ certified (bool): Whether the image bundle is certified or
+ not. Default is True.
+ '''
+ certified_image = 'true' if certified else 'false'
+ data = {
+ 'name': name,
+ 'isCertifiedImage': certified_image,
+ 'images': images,
+ }
+ return'/image/', data=data,
+ timeout=self.request_timeout)
+ def update_image_bundle(self, bundle_id, name, images, certified=True):
+ ''' Update an existing image bundle
+ '''
+ certified_image = 'true' if certified else 'false'
+ data = {
+ 'id': bundle_id,
+ 'name': name,
+ 'isCertifiedImage': certified_image,
+ 'images': images,
+ }
+ return'/image/', data=data,
+ timeout=self.request_timeout)
+ def apply_image_to_device(self, image, device, create_task=True):
+ ''' Apply an image bundle to a device
+ Args:
+ image (dict): The image info.
+ device (dict): Info about device to apply image to.
+ create_task (bool): Determines whether or not to execute a save
+ and create the tasks (if any)
+ Returns:
+ response (dict): A dict that contains a status and a list of
+ task ids created (if any). Image updates will not run until
+ task or tasks are executed.
+ Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}}
+ '''
+ return self.apply_image_to_element(image, device, device['fqdn'],
+ 'netelement', create_task)
+ def apply_image_to_container(self, image, container, create_task=True):
+ ''' Apply an image bundle to a container
+ Args:
+ image (dict): The image info.
+ container (dict): Info about container to apply image to.
+ create_task (bool): Determines whether or not to execute a save
+ and create the tasks (if any)
+ Returns:
+ response (dict): A dict that contains a status and a list of
+ task ids created (if any). Image updates will not run until
+ task or tasks are executed.
+ Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}}
+ '''
+ return self.apply_image_to_element(image, container, container['name'],
+ 'container', create_task)
+ def apply_image_to_element(self, image, element, name, id_type,
+ create_task=True):
+ ''' Apply an image bundle to a device or container.
+ Args:
+ image (dict): The image info.
+ element (dict): Info about element to apply image to. Dict
+ can contain device info or container info.
+ name (str): Name of element image is being applied to.
+ id_type (str): Id type of element image is being applied to.
+ create_task (bool): Determines whether or not to execute a save
+ and create the tasks (if any)
+ Returns:
+ response (dict): A dict that contains a status and a list of
+ task ids created (if any). Image updates will not run until
+ task or tasks are executed.
+ Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}}
+ '''
+ self.log.debug('Attempt to apply %s to %s %s' % (image['name'],
+ id_type, name))
+ info = 'Apply image: %s to %s %s' % (image['name'], id_type, name)
+ node_id = ''
+ if 'imageBundleKeys' in image:
+ if image['imageBundleKeys']:
+ node_id = image['imageBundleKeys'][0]
+'Provided image is an image object.'
+ ' Using first value from imageBundleKeys - %s'
+ % node_id)
+ if 'id' in image:
+ node_id = image['id']
+'Provided image is an image bundle object.'
+ ' Found v1 API id field - %s' % node_id)
+ elif 'key' in image:
+ node_id = image['key']
+'Provided image is an image bundle object.'
+ ' Found v2 API key field - %s' % node_id)
+ data = {'data': [{'info': info,
+ 'infoPreview': info,
+ 'note': '',
+ 'action': 'associate',
+ 'nodeType': 'imagebundle',
+ 'nodeId': node_id,
+ 'toId': element['key'],
+ 'toIdType': id_type,
+ 'fromId': '',
+ 'nodeName': image['name'],
+ 'fromName': '',
+ 'toName': name,
+ 'childTasks': [],
+ 'parentTask': ''}]}
+ self._add_temp_action(data)
+ if create_task:
+ return self._save_topology_v2([])
+ return None
+ def remove_image_from_device(self, image, device):
+ ''' Remove the image bundle from the specified device.
+ Args:
+ image (dict): The image info.
+ device (dict): The device info.
+ Returns:
+ response (dict): A dict that contains a status and a list of
+ task ids created (if any).
+ Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}}
+ '''
+ return self.remove_image_from_element(image, device, device['fqdn'],
+ 'netelement')
+ def remove_image_from_container(self, image, container):
+ ''' Remove the image bundle from the specified container.
+ Args:
+ image (dict): The image info.
+ container (dict): The container info.
+ Returns:
+ response (dict): A dict that contains a status and a list of
+ task ids created (if any).
+ Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}}
+ '''
+ return self.remove_image_from_element(image, container,
+ container['name'], 'container')
+ def remove_image_from_element(self, image, element, name, id_type):
+ ''' Remove the image bundle from the specified container.
+ Args:
+ image (dict): The image info.
+ element (dict): The container info.
+ name (): name.
+ id_type (): type.
+ Returns:
+ response (dict): A dict that contains a status and a list of
+ task ids created (if any).
+ Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}}
+ '''
+ self.log.debug('Attempt to remove %s from %s' % (image['name'], name))
+ info = 'Remove image: %s from %s' % (image['name'], name)
+ node_id = ''
+ if 'imageBundleKeys' in image:
+ if image['imageBundleKeys']:
+ node_id = image['imageBundleKeys'][0]
+'Provided image is an image object.'
+ ' Using first value from imageBundleKeys - %s'
+ % node_id)
+ if 'id' in image:
+ node_id = image['id']
+'Provided image is an image bundle object.'
+ ' Found v1 API id field - %s' % node_id)
+ elif 'key' in image:
+ node_id = image['key']
+'Provided image is an image bundle object.'
+ ' Found v2 API key field - %s' % node_id)
+ data = {'data': [{'info': info,
+ 'infoPreview': info,
+ 'note': '',
+ 'action': 'associate',
+ 'nodeType': 'imagebundle',
+ 'nodeId': '',
+ 'toId': element['key'],
+ 'toIdType': id_type,
+ 'fromId': '',
+ 'nodeName': '',
+ 'fromName': '',
+ 'toName': name,
+ 'ignoreNodeId': node_id,
+ 'ignoreNodeName': image['name'],
+ 'childTasks': [],
+ 'parentTask': ''}]}
+ self._add_temp_action(data)
+ return self._save_topology_v2([])
+ def get_change_controls(self, query='', start=0, end=0):
+ ''' Returns a list of change controls.
+ Args:
+ query (str): Query to look for in change control names
+ start (int): Start index for the pagination. Default is 0.
+ end (int): End index for the pagination. If end index is 0
+ then all the records will be returned. Default is 0.
+ Returns:
+ change controls (list): The list of change controls
+ '''
+ self.log.debug('get_change_controls: query: %s' % query)
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion >= 3.0:
+ self.log.debug('v3/v4/v5 getChangeControls API Call')
+ self.log.warning(
+ 'get_change_controls: change control APIs moved for v3/v4/v5')
+ return None
+ self.log.debug('v2 getChangeControls API Call')
+ data = self.clnt.get(
+ '/changeControl/'
+ '&startIndex=%d&endIndex=%d' % (qplus(query), start, end),
+ timeout=self.request_timeout)
+ if 'data' not in data:
+ return None
+ return data['data']
+ def change_control_available_tasks(self, query='', start=0, end=0):
+ ''' Returns a list of tasks that are available for a change control.
+ Args:
+ query (str): Query to look for in task
+ start (int): Start index for the pagination. Default is 0.
+ end (int): End index for the pagination. If end index is 0
+ then all the records will be returned. Default is 0.
+ Returns:
+ tasks (list): The list of available tasks
+ '''
+ self.log.debug('change_control_available_tasks: query: %s' % query)
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion >= 3.0:
+ self.log.debug(
+ 'v3/v4/v5 uses existing get_task_by_status API Call')
+ return self.get_tasks_by_status('PENDING')
+ self.log.debug('v2 getTasksByStatus API Call')
+ data = self.clnt.get(
+ '/changeControl/'
+ '&startIndex=%d&endIndex=%d' % (qplus(query), start, end),
+ timeout=self.request_timeout)
+ if 'data' not in data:
+ return None
+ return data['data']
+ def create_change_control(self, name, change_control_tasks, timezone,
+ country_id, date_time, snapshot_template_key='',
+ change_control_type='Custom',
+ stop_on_error='false'):
+ ''' Create change control with provided information and return
+ change control ID.
+ Args:
+ name (string): The name for the new change control.
+ change_control_tasks (list): A list of key value pairs where
+ the key is the Task ID and the value is the task order
+ as an integer.
+ Ex: [{'taskId': '100', 'taskOrder': 1},
+ {'taskId': '101', 'taskOrder': 1},
+ {'taskId': '102', 'taskOrder': 2}]
+ timezone (string): The timezone as a string.
+ Ex: "America/New_York"
+ country_id (string): The country ID.
+ Ex: "United States"
+ date_time (string): The date and time for execution.
+ Time is military time format.
+ Ex: "2018-08-22 11:30"
+ snapshot_template_key (string): ???
+ change_control_type (string): The type of change control being
+ created. Options are "Custom" or "Rollback".
+ stop_on_error (string): String representation of a boolean
+ to set whether this change control will stop if an error is
+ encountered in one of its tasks.
+ Returns:
+ response (dict): A dict that contains...
+ Ex: {"data": "success", "ccId": "4"}
+ '''
+ self.log.debug('create_change_control')
+ # {
+ # "timeZone": "America/New_York",
+ # "countryId": "United States",
+ # "dateTime": "2018-08-22 11:30",
+ # "ccName": "test2",
+ # "snapshotTemplateKey": "",
+ # "type": "Custom",
+ # "stopOnError": "false",
+ # "deletedTaskIds": [],
+ # "changeControlTasks": [
+ # {
+ # "taskId": "126",
+ # "taskOrder": 1,
+ # "snapshotTemplateKey": "",
+ # "clonedCcId": ""
+ # }
+ # ]
+ # }
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion >= 3.0:
+ self.log.debug('v3/v4/v5 addOrUpdateChangeControl API Call')
+ self.log.warning('create_change_control:'
+ ' change control APIs moved for v3/v4/v5')
+ return None
+ self.log.debug('v2 addOrUpdateChangeControl API Call')
+ task_data_list = []
+ for task_info in change_control_tasks:
+ task_list_entry = {'taskId': task_info['taskId'],
+ 'taskOrder': task_info['taskOrder'],
+ 'snapshotTemplateKey': snapshot_template_key,
+ 'clonedCcId': ''}
+ task_data_list.append(task_list_entry)
+ data = {'timeZone': timezone,
+ 'countryId': country_id,
+ 'dateTime': date_time,
+ 'ccName': name,
+ 'snapshotTemplateKey': snapshot_template_key,
+ 'type': change_control_type,
+ 'stopOnError': stop_on_error,
+ 'deletedTaskIds': [],
+ 'changeControlTasks': task_data_list}
+ return'/changeControl/',
+ data=data, timeout=self.request_timeout)
+ def create_change_control_v3(self, cc_id, name, tasks, sequential=True):
+ ''' Create change control with provided information and return
+ change control ID.
+ Args:
+ cc_id (string): The ID for the new change control.
+ name (string): The name for the new change control.
+ tasks (list): A list of Task IDs as strings
+ Ex: ['10', '11', '12']
+ sequential (bool): A flag for running tasks sequentially or
+ in parallel. Defaults to True for running sequentially.
+ Returns:
+ response (dict): A dict that contains...
+ Ex: [{u'id': u'cc_id',
+ u'update_timestamp': u'...'}]
+ '''
+ self.log.debug('create_change_control_v3')
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion < 3.0:
+ self.log.debug('Wrong method for API version %s.'
+ ' Use create_change_control method',
+ self.clnt.apiversion)
+ self.log.warning('create_change_control_v3:'
+ ' Use old change control APIs for old versions')
+ return None
+ self.log.debug('v3 Update change control API Call')
+ stages = []
+ if sequential:
+ for index, task in enumerate(tasks):
+ stage_id = 'stage%d' % index
+ stage = {'stage': [{
+ 'id': stage_id,
+ 'action': {
+ 'name': 'task',
+ 'args': {
+ 'TaskID': task,
+ }
+ }
+ }]}
+ stages.append(stage)
+ else:
+ stage_rows = []
+ for index, task in enumerate(tasks):
+ stage_id = 'stage%d' % index
+ stage_row = {
+ 'id': stage_id,
+ 'action': {
+ 'name': 'task',
+ 'args': {
+ 'TaskID': task,
+ }
+ }
+ }
+ stage_rows.append(stage_row)
+ stages.append({'stage': stage_rows})
+ data = {'config': {
+ 'id': cc_id,
+ 'name': name,
+ 'root_stage': {
+ 'id': 'root',
+ 'stage_row': stages,
+ }
+ }}
+ return'/api/v3/services/ccapi.ChangeControl/Update',
+ data=data, timeout=self.request_timeout)
+ def add_notes_to_change_control(self, cc_id, notes):
+ ''' Add provided notes to the specified change control.
+ Args:
+ cc_id (string): The id for the change control to add notes to.
+ notes (string): The notes to add to the change control.
+ Returns:
+ response (dict): A dict that contains...
+ Ex: {"data": "success"}
+ '''
+ self.log.debug('add_notes_to_change_control: cc_id %s, notes %s'
+ % (cc_id, notes))
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion >= 3.0:
+ self.log.debug(
+ 'v3/v4/v5 addNotesToChangeControl API Call deprecated')
+ self.log.warning('add_notes_to_change_control:'
+ ' change control APIs not supported for v3/v4/v5')
+ return None
+ self.log.debug('v2 addNotesToChangeControl API Call')
+ data = {'ccId': cc_id,
+ 'notes': notes}
+ return'/changeControl/',
+ data=data, timeout=self.request_timeout)
+ def execute_change_controls(self, cc_ids):
+ ''' Execute the change control indicated by its ccId.
+ Args:
+ cc_ids (list): A list of change control IDs to be executed.
+ '''
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion >= 3.0:
+ self.log.debug(
+ 'v3/v4/v5 /api/v3/services/ccapi.ChangeControl/Start API Call')
+ for cc_id in cc_ids:
+ resp_list = []
+ data = {'cc_id': cc_id}
+ resp =
+ '/api/v3/services/ccapi.ChangeControl/Start',
+ data=data, timeout=self.request_timeout)
+ resp_list.append(resp)
+ return resp_list
+ self.log.debug('v2 executeCC API Call')
+ cc_id_list = [{'ccId': x} for x in cc_ids]
+ data = {'ccIds': cc_id_list}
+ return'/changeControl/', data=data,
+ timeout=self.request_timeout)
+ def approve_change_control(self, cc_id, timestamp=None):
+ ''' Cancel the provided change controls.
+ Args:
+ cc_id (string): The change control IDs to be approved.
+ timestamp(string): The change controls timestamp.
+ '''
+ if not timestamp:
+ timestamp = datetime.utcnow().isoformat() + 'Z'
+ self.log.debug('approve_change_control')
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion < 3.0:
+ self.log.debug('Approval methods not valid for API version %s.'
+ ' Functionality did not exist',
+ self.clnt.apiversion)
+ return None
+ self.log.debug('v3 Approve change control API Call')
+ data = {'cc_id': cc_id, 'cc_timestamp': timestamp}
+ return
+ '/api/v3/services/ccapi.ChangeControl/AddApproval',
+ data=data, timeout=self.request_timeout)
+ def delete_change_control_approval(self, cc_id):
+ ''' Cancel the provided change controls.
+ Args:
+ cc_id (string): The change control IDs to be approved.
+ '''
+ self.log.debug('delete_change_control_approval')
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion < 3.0:
+ self.log.debug('Approval methods not valid for API version %s.'
+ ' Functionality did not exist',
+ self.clnt.apiversion)
+ return None
+ self.log.debug('v3 Delete Approval for change control API Call')
+ data = {'cc_id': cc_id}
+ return
+ '/api/v3/services/ccapi.ChangeControl/DeleteApproval',
+ data=data, timeout=self.request_timeout)
+ def cancel_change_controls(self, cc_ids):
+ ''' Cancel the provided change controls.
+ Args:
+ cc_ids (list): A list of change control IDs to be cancelled.
+ '''
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion >= 3.0:
+ self.log.debug(
+ 'v3/v4/v5 /api/v3/services/ccapi.ChangeControl/Stop API Call')
+ resp_list = []
+ for cc_id in cc_ids:
+ data = {'cc_id': cc_id}
+ resp =
+ '/api/v3/services/ccapi.ChangeControl/Stop',
+ data=data, timeout=self.request_timeout)
+ resp_list.append(resp)
+ return resp_list
+ self.log.debug('v2 cancelChangeControl API Call')
+ data = {'ccIds': cc_ids}
+ return'/changeControl/',
+ data=data, timeout=self.request_timeout)
+ def delete_change_controls(self, cc_ids):
+ ''' Delete the provided change controls.
+ Args:
+ cc_ids (list): A list of change control IDs to be deleted.
+ '''
+ msg = 'Change Control Resource APIs supported from 2021.2.0 or newer.'
+ if self.cvp_version_compare('>=', 6.0, msg):
+ self.log.debug(
+ 'v6+ Using Resource API Change Control Delete API Call')
+ resp_list = []
+ for cc_id in cc_ids:
+ resp = self.change_control_delete(cc_id)
+ resp_list.append(resp)
+ return resp_list
+ msg = 'Change Control Service APIs supported from 2019.0.0 to 2021.2.0'
+ if self.cvp_version_compare('>=', 3.0, msg):
+ self.log.debug(
+ 'v3/v4/v5 /api/v3/services/ccapi.ChangeControl/Delete'
+ ' API Call')
+ resp_list = []
+ for cc_id in cc_ids:
+ data = {'cc_id': cc_id}
+ resp =
+ '/api/v3/services/ccapi.ChangeControl/Delete',
+ data=data, timeout=self.request_timeout)
+ resp_list.append(resp)
+ return resp_list
+ self.log.debug('v2 deleteChangeControl API Call')
+ data = {'ccIds': cc_ids}
+ return'/changeControl/',
+ data=data, timeout=self.request_timeout)
+ def get_change_control_info(self, cc_id):
+ ''' Get the detailed information for a single change control.
+ Args:
+ cc_id (string): The id for the change control to be retrieved.
+ Returns:
+ response (dict): A dict that contains...
+ Ex: {'ccId': '4',
+ 'ccName': 'test_api_1541106830',
+ 'changeControlTasks': {'data': [<task data>],
+ 'total': 1},
+ 'classId': 68,
+ 'containerName': '',
+ 'countryId': '',
+ 'createdBy': 'cvpadmin',
+ 'createdTimestamp': 1541106831629,
+ 'dateTime': '',
+ 'deviceCount': 1,
+ 'executedBy': 'cvpadmin',
+ 'executedTimestamp': 1541106831927,
+ 'factoryId': 1,
+ 'id': 68,
+ 'key': '4',
+ 'notes': '',
+ 'postSnapshotEndTime': 0,
+ 'postSnapshotStartTime': 0,
+ 'preSnapshotEndTime': 0,
+ 'preSnapshotStartTime': 0,
+ 'progressStatus': {<status>},
+ 'scheduledBy': '',
+ 'scheduledByPassword': '',
+ 'scheduledTimestamp': 0,
+ 'snapshotTemplateKey': '',
+ 'snapshotTemplateName': None,
+ 'status': 'Inprogress',
+ 'stopOnError': False,
+ 'taskCount': 1,
+ 'taskEndTime': 0,
+ 'taskStartTime': 0,
+ 'timeZone': '',
+ 'type': 'Custom'}
+ '''
+ self.log.debug('get_change_control_info: %s', cc_id)
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion >= 3.0:
+ self.log.debug('get_change_control_info method deprecated for'
+ ' v3/v4/v5. Moved to get_change_control_status')
+ self.log.warning('get_change_control_info:'
+ ' info change control API moved for v3/v4/v5 to'
+ ' status')
+ return None
+ self.log.debug('v2 API Call')
+ try:
+ resp = self.clnt.get(
+ '/changeControl/'
+ 'startIndex=0&endIndex=0&ccId=%s' % cc_id,
+ timeout=self.request_timeout)
+ except CvpApiError as error:
+ if 'No data found' in error.msg:
+ return None
+ raise
+ return resp
+ def get_change_control_status(self, cc_id):
+ ''' Get the detailed information for a single change control.
+ Args:
+ cc_id (string): The id for the change control to be retrieved.
+ Returns:
+ response (dict): A dict that contains...
+ Ex:
+ [{u'status': {u'error': u'',
+ u'id': u'cc_id',
+ u'stages': {u' ': {
+ u'error': u'',
+ u'state': u'Completed'},
+ u'Task_0_1': {
+ u'error': u'',
+ u'state': u'Completed'}
+ },
+ u'state': u'Completed'}}]
+ '''
+ self.log.debug('get_change_control_status: %s', cc_id)
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion < 3.0:
+ self.log.debug('get_change_control_status method not supported'
+ ' for API version %s. Use old'
+ ' get_change_control_info method'
+ % self.clnt.apiversion)
+ return None
+ self.log.debug(
+ 'v3 /api/v3/services/ccapi.ChangeControl/GetStatus API Call')
+ data = {'cc_id': cc_id}
+ return
+ '/api/v3/services/ccapi.ChangeControl/GetStatus',
+ data=data, timeout=self.request_timeout)
+ def reset_device(self, app_name, device, create_task=True):
+ ''' Reset device by moving it to the Undefined Container.
+ Args:
+ app_name (str): String to specify info/signifier of calling app
+ device (dict): Device info
+ create_task (bool): Determines whether or not to execute a save
+ and create the tasks (if any)
+ Returns:
+ response (dict): A dict that contains a status and a list of
+ task ids created (if any).
+ Ex: {u'data': {u'status': u'success', u'taskIds': []}}
+ '''
+ info = ('App %s resetting device %s and moving it to Undefined'
+ % (app_name, device['fqdn']))
+ self.log.debug(info)
+ if 'parentContainerId' in device:
+ from_id = device['parentContainerId']
+ else:
+ parent_cont = self.get_parent_container_for_device(device['key'])
+ if parent_cont and 'key' in parent_cont:
+ from_id = parent_cont['key']
+ else:
+ from_id = ''
+ data = {'data': [{'info': info,
+ 'infoPreview': info,
+ 'action': 'reset',
+ 'nodeType': 'netelement',
+ 'nodeId': device['key'],
+ 'toId': 'undefined_container',
+ 'fromId': from_id,
+ 'nodeName': device['fqdn'],
+ 'toName': 'Undefined',
+ 'toIdType': 'container',
+ 'childTasks': [],
+ 'parentTask': ''}]}
+ try:
+ self._add_temp_action(data)
+ except CvpApiError as error:
+ if 'Data already exists' in str(error):
+ self.log.debug('Device %s already in container Undefined'
+ % device['fqdn'])
+ if create_task:
+ return self._save_topology_v2([])
+ return None
+ def deploy_device(self, device, container, configlets=None,
+ image_bundle=None, create_task=True,
+ app_name='Deploy_device'):
+ ''' Move a device from the undefined container to a target container.
+ Optionally apply device-specific configlets and an image.
+ Args:
+ device (dict): unique key for the device
+ container (str): name of container to move device to
+ configlets (list): list of dicts with configlet key/name pairs
+ image_bundle (str): name of image bundle to apply to device
+ create_task (boolean): Create task for this deploy device
+ sequence.
+ app_name (str): calling application name for logging purposes
+ Returns:
+ response (dict): A dict that contains a status and a list of
+ task ids created (if any).
+ Ex: {u'data': {u'status': u'success', u'taskIds': [u'32']}}
+ '''
+ info = 'Deploy device %s to container %s' % (device['fqdn'], container)
+ self.log.debug(info)
+ container_info = self.get_container_by_name(container)
+ # Add action for moving device to specified container
+ self.move_device_to_container(app_name, device, container_info,
+ create_task=False)
+ # Get proposed configlets device will inherit from container it is
+ # being moved to.
+ prop_conf = self.clnt.get('/provisioning/getTempConfigsByNetElementId.'
+ 'do?netElementId=%s' % device['key'])
+ new_configlets = prop_conf['proposedConfiglets']
+ if configlets:
+ new_configlets.extend(configlets)
+ self.apply_configlets_to_device('deploy_device', device,
+ new_configlets, create_task=False)
+ # Apply image to the device
+ if image_bundle:
+ image_bundle_info = self.get_image_bundle_by_name(image_bundle)
+ self.apply_image_to_device(image_bundle_info, device,
+ create_task=False)
+ if create_task:
+ return self._save_topology_v2([])
+ return None
+ def create_enroll_token(self, duration, devices=None):
+ ''' Create TerminAttr enrollment token for device authentication
+ via certificates.
+ Args:
+ devices (list): list of device Serial Numbers for which the
+ token should be generated. The default is all devices.
+ duration (string): the token's validity time (max 1 month),
+ accepted formats are: "24h", "86400s", "60m"
+ Returns:
+ response (list) on CVaaS: A list that contains the generated
+ enrollment token.
+ Ex: [{'enrollmentToken':{'token': <token>, 'groups': [],
+ 'reenrollDevices': <devices list>,
+ 'validFor': <duration e.g 24h>, 'field_mask': None}}]
+ response (dict) on CV on-prem: A dictionary that contains the
+ generated enrollment token.
+ Ex: {'data': <token>}
+ '''
+ if not devices:
+ devices = ["*"]
+ # For on-prem check the version as it is only supported from 2021.2.0+
+ if not self.clnt.is_cvaas:
+ if self.clnt.apiversion is None:
+ self.get_cvp_info()
+ if self.clnt.apiversion >= 6.0:
+ self.log.debug('v6 /cvpservice/enroll/createToken')
+ data = {"reenrollDevices": devices, "duration": duration}
+ return'/cvpservice/enroll/createToken',
+ data=data, timeout=self.request_timeout)
+ self.log.warning(
+ 'Enrollment Tokens only supported on CVP 2021.2.0+')
+ return None
+ data = {
+ "enrollmentToken": {"reenrollDevices": devices,
+ "validFor": duration}
+ }
+ return
+ '/api/v3/services/admin.Enrollment/AddEnrollmentToken',
+ data=data, timeout=self.request_timeout)
+ def get_all_tags(self, element_type='ELEMENT_TYPE_UNSPECIFIED', workspace_id=''):
+ ''' Get all device and/or interface tags from the mainline workspace or all other workspaces
+ Args:
+ element_type (str): Can be ELEMENT_TYPE_DEVICE, ELEMENT_TYPE_INTERFACE and
+ set to ELEMENT_TYPE_UNSPECIFIED by default which fetches all tags
+ workspace_id (str): The ID of the workspace, by default it is set to an empty string
+ which will use the mainline workspace
+ Returns:
+ response (dict): A dict that contains a list of key-value tags
+ '''
+ msg = 'Tag.V2 Resource APIs are supported from 2021.2.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.2.0+
+ if self.cvp_version_compare('>=', 6.0, msg):
+ tag_url = '/api/resources/tag/v2/Tag/all'
+ payload = {
+ "partialEqFilter": [
+ {
+ "key": {
+ "elementType": element_type,
+ "workspaceId": workspace_id
+ }
+ }
+ ]
+ }
+ self.log.debug('v6 {}'.format(tag_url))
+ return, data=payload)
+ def get_tag_edits(self, workspace_id):
+ ''' Show all tags edits in a workspace
+ Args:
+ workspace_id: The ID of the workspace
+ Returns:
+ response (dict): A dict that contains...
+ Ex.: {'data': [{'result': {'value': {'key': {'workspaceId': 'testget',
+ 'elementType': 'string', 'label': 'string', 'value': 'string'},
+ 'remove': False}, 'time': 'rfc3339 time', 'type': 'INITIAL'}}]}
+ '''
+ msg = 'Tag.V2 Resource APIs are supported from 2021.2.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.2.0+
+ if self.cvp_version_compare('>=', 6.0, msg):
+ tag_url = '/api/resources/tag/v2/TagConfig/all'
+ payload = {
+ "partialEqFilter": [
+ {
+ "key": {
+ "workspace_id": workspace_id
+ }
+ }
+ ]
+ }
+ self.log.debug('v6 ' + tag_url + ' ' + str(payload))
+ return, data=payload)
+ def get_tag_assignment_edits(self, workspace_id):
+ ''' Show all tags assignment edits in a workspace
+ Args:
+ workspace_id: The ID of the workspace
+ Returns:
+ response (dict): A dict that contains...
+ Ex: {'result': {'value': {'key': {'workspaceId': 'string', 'elementType': 'string',
+ 'label': 'string', 'value': 'string', 'deviceId': 'string'},
+ 'remove': False}, 'time': 'rfc3339', 'type': 'INITIAL'}}
+ '''
+ msg = 'Tag.V2 Resource APIs are supported from 2021.2.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.2.0+
+ if self.cvp_version_compare('>=', 6.0, msg):
+ tag_url = '/api/resources/tag/v2/TagAssignmentConfig/all'
+ payload = {
+ "partialEqFilter": [
+ {
+ "key": {
+ "workspace_id": workspace_id
+ }
+ }
+ ]
+ }
+ self.log.debug('v6 ' + tag_url + ' ' + str(payload))
+ return, data=payload)
+ def tag_config(self, element_type, workspace_id, tag_label, tag_value, remove=False):
+ ''' Create/Delete device or interface tags.
+ Tag creation with the tag.v2 resource API has to be done within a workspace.
+ Args:
+ element_type (str): Can be ELEMENT_TYPE_DEVICE or ELEMENT_TYPE_INTERFACE to
+ create device and interface tag respectively.
+ workspace_id(str): The ID of the workspace.
+ This should be generated by the create_workspace() API call.
+ tag_label(str): the label of the desired tag
+ tag_value(str): the value of the desired tag
+ remove (Boolean): When set to True it will remove the device/interface tag.
+ When set to False (default) it will create the device/interface tag.
+ Returns:
+ response (dict): A dict that contains...
+ Ex: {'value': {'key': {'workspaceId': 'string', 'elementType': 'string',
+ 'label': 'string', 'value': 'string'}},
+ 'time': 'rfc3339 time'}
+ '''
+ msg = 'Tag.V2 Resource APIs are supported from 2021.2.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.2.0+
+ if self.cvp_version_compare('>=', 6.0, msg):
+ tag_url = '/api/resources/tag/v2/TagConfig'
+ payload = {
+ "key": {
+ "elementType": element_type,
+ "workspaceId": workspace_id,
+ "label": tag_label,
+ "value": tag_value
+ },
+ "remove": remove
+ }
+ self.log.debug('v6 {} '.format(tag_url) + str(payload))
+ return, data=payload)
+ def tag_assignment_config(self, element_type, workspace_id, tag_label,
+ tag_value, device_id, interface_id, remove=False):
+ ''' Assign/Unassign device or interface tags.
+ Tag assignment with the tag.v2 resource API has to be done within a workspace.
+ Args:
+ element_type (str): can be ELEMENT_TYPE_DEVICE or ELEMENT_TYPE_INTERFACE to
+ create device and interface tag respectively
+ workspace_id(str): the ID of the workspace. This should be generated by
+ the create_workspace() API call.
+ tag_label(str): the label of the desired tag
+ tag_value(str): the value of the desired tag
+ device_id (str): the Serial Number of the device
+ interface_id (str): the interface name of the device, e.g.: Ethernet1
+ remove (Boolean): When set to True it will remove the device/interface
+ tag assignment.
+ When set to False (default) it will create the device/interface tag assignment.
+ Returns:
+ response (dict): A dict that contains...
+ Ex: {'value': {'key': {'workspaceId': 'string', 'elementType': 'string',
+ 'label': 'string', 'value': 'string',
+ 'deviceId': 'string', 'interfaceId': 'string'},
+ 'remove': Boolean},'time': 'rfc3339 time'}
+ '''
+ msg = 'Tag.V2 Resource APIs are supported from 2021.2.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.2.0+
+ if self.cvp_version_compare('>=', 6.0, msg):
+ tag_url = '/api/resources/tag/v2/TagAssignmentConfig'
+ payload = {
+ "key": {
+ "elementType": element_type,
+ "workspaceId": workspace_id,
+ "label": tag_label,
+ "value": tag_value,
+ "deviceId": device_id,
+ "interfaceId": interface_id
+ },
+ "remove": remove
+ }
+ self.log.debug('v6 {} '.format(tag_url) + str(payload))
+ return, data=payload)
+ def get_all_workspaces(self):
+ ''' Get state information for all workspaces
+ Returns:
+ response (dict): A dict that contains a list of key-values for workspaces
+ '''
+ msg = 'Workspace Resource APIs are supported from 2021.2.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.2.0+
+ if self.cvp_version_compare('>=', 6.0, msg):
+ workspace_url = '/api/resources/workspace/v1/Workspace/all'
+ payload = {}
+ self.log.debug('v6 {}'.format(workspace_url))
+ return, data=payload)
+ def get_workspace(self, workspace_id):
+ ''' Get state information for all workspaces
+ Returns:
+ response (dict): A dict that contains a list of key-values for workspaces
+ '''
+ msg = 'Workspace Resource APIs are supported from 2021.2.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.2.0+
+ if self.cvp_version_compare('>=', 6.0, msg):
+ workspace_url = '/api/resources/workspace/v1/Workspace?key.workspaceId={}'.format(
+ workspace_id)
+ self.log.debug('v6 {}'.format(workspace_url))
+ return self.clnt.get(workspace_url)
+ def workspace_config(self, workspace_id, display_name,
+ description='', request='REQUEST_UNSPECIFIED',
+ request_id=''):
+ ''' Create, Build and Submit workspaces.
+ Args:
+ workspace_id (str): The (unique) name of the workspace.
+ Previously used names cannot be used if the workspace was closed or abandoned.
+ display_name (str): The display name of the workspace.
+ description (str): The description of the workspace.
+ request (string): Can have the following values:
+ request_id (str): An arbitrary requestId that is required for the
+ build and submit process.
+ Returns:
+ response (dict): A dict that contains...
+ Ex: {'value': {'key': {'workspaceId': 'string'},
+ 'displayName': 'string','description': 'string',
+ 'requestParams': {'requestId': 'string'}
+ },
+ 'time': 'rfc3339 time'}
+ '''
+ msg = 'Workspace Resource APIs are supported from 2021.2.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.2.0+
+ if self.cvp_version_compare('>=', 6.0, msg):
+ workspace_url = '/api/resources/workspace/v1/WorkspaceConfig'
+ payload = {
+ "key": {
+ "workspaceId": workspace_id
+ },
+ "displayName": display_name,
+ "description": description,
+ "request": request,
+ "requestParams": {
+ "requestId": request_id
+ }
+ }
+ self.log.debug('v6 ' + str(workspace_url) + ' ' + str(payload))
+ return, data=payload)
+ def workspace_build_status(self, workspace_id, build_id):
+ ''' Verify the state of the workspace build process.
+ Args:
+ workspace_id (str): The (unique) name of the workspace.
+ build_id (str): The buildId of the workspace for which the
+ build process was requested.
+ Returns:
+ response (dict): A dict that contains...
+ Ex: {'value': {'key': {'workspaceId': 'string', 'buildId': 'string'},
+ 'state': 'BUILD_STATE_SUCCESS', 'buildResults': {'values': ...
+ '''
+ msg = 'Workspace Resource APIs are supported from 2021.2.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.2.0+
+ if self.cvp_version_compare('>=', 6.0, msg):
+ params = 'key.workspaceId={}&key.buildId={}'.format(workspace_id, build_id)
+ workspace_url = '/api/resources/workspace/v1/WorkspaceBuild?' + params
+ self.log.debug('v6 {}'.format(workspace_url + params))
+ return self.clnt.get(workspace_url, timeout=self.request_timeout)
+ def change_control_get_one(self, cc_id, cc_time=None):
+ ''' Get the configuration and status of a change control using Resource APIs.
+ Supported versions: CVP 2021.2.0 or newer and CVaaS.
+ Args:
+ cc_id (str): The ID of the change control.
+ cc_time (str): Time indicates the time for which you are interested in the data.
+ If no time is given, the server will use the time at which it makes the request.
+ The time format is RFC 3339, e.g.: 2021-12-24T11:30:00.00Z.
+ Returns:
+ response (dict): A dict that contains...
+ Ex: {"value":{"key":{"id":"rL6Tog6UU"}, "change":{"name":"Change 20211213_210554",
+ "rootStageId":"kZUWqyIArD",
+ "stages":{"values":{"kZUWqyIArD":{"name":"Change 20211213_210554 Root",
+ "rows":{"values":[{"values":["vazWhKyVRR"]}]}},
+ "vazWhKyVRR":{"name":"Update Config",
+ "action":{"name":"task", "timeout":3000,
+ "args":{"values":{"TaskID":"538"}}}}}},
+ "notes":"", "time":"2021-12-13T21:05:58.813750128Z", "user":"cvpadmin"},
+ "approve":{"value":true, "time":"2021-12-13T21:11:26.788753264Z",
+ "user":"cvpadmin"}}, "time":"2021-12-13T21:11:26.788753264Z"}%
+ '''
+ msg = 'Change Control Resource APIs are supported from 2021.2.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.2.0+
+ if self.cvp_version_compare('>=', 6.0, msg):
+ if cc_time is None:
+ params = '{}'.format(cc_id)
+ else:
+ params = '{}&time={}'.format(cc_id, cc_time)
+ cc_url = '/api/resources/changecontrol/v1/ChangeControl?' + params
+ self.log.debug('v6 {}'.format(cc_url))
+ try:
+ response = self.clnt.get(cc_url, timeout=self.request_timeout)
+ except Exception as error:
+ if 'resource not found' in str(error):
+ return None
+ raise error
+ return response
+ def change_control_get_all(self):
+ ''' Get the configuration and status of all Change Controls using Resource APIs.
+ Supported versions: CVP 2021.2.0 or newer and CVaaS.
+ Returns:
+ response (dict): A dict that contains a list of all Change Controls.
+ '''
+ msg = 'Change Control Resource APIs are supported from 2021.2.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.2.0+
+ if self.cvp_version_compare('>=', 6.0, msg):
+ cc_url = '/api/resources/changecontrol/v1/ChangeControl/all'
+ self.log.debug('v6 {}'.format(cc_url))
+ return self.clnt.get(cc_url, timeout=self.request_timeout)
+ def change_control_approval_get_one(self, cc_id, cc_time=None):
+ ''' Get the state of a specific Change Control's approve config using Resource APIs.
+ Supported versions: CVP 2021.2.0 or newer and CVaaS.
+ Args:
+ cc_id (str): The ID of the change control.
+ cc_time (str): Time indicates the time for which you are interested in the data.
+ If no time is given, the server will use the time at which it makes the request.
+ The time format is RFC 3339, e.g.: 2021-12-24T11:30:00.00Z.
+ Returns:
+ response (dict): A dict that contains...
+ Ex: {'value': {'key': {'id': '<CC ID>'}, 'approve': {'value': True},
+ 'version': '2021-12-13T21:05:58.813750128Z'},
+ 'time': '2021-12-13T21:11:26.788753264Z'}
+ '''
+ msg = 'Change Control Resource APIs are supported from 2021.2.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.2.0+
+ if self.cvp_version_compare('>=', 6.0, msg):
+ if cc_time is None:
+ params = '{}'.format(cc_id)
+ else:
+ params = '{}&time={}'.format(cc_id, cc_time)
+ cc_url = '/api/resources/changecontrol/v1/ApproveConfig?' + params
+ cc_status = self.change_control_get_one(cc_id)
+ if cc_status is None:
+ return None
+ if 'value' in cc_status and 'approve' not in cc_status['value']:
+ self.log.warning("The change has not been approved yet."
+ " A change has to be approved at least once for the 'approve'"
+ " state to be populated.")
+ return None
+ return self.clnt.get(cc_url, timeout=self.request_timeout)
+ def change_control_approval_get_all(self):
+ ''' Get state information for all Change Control Approvals using Resource APIs.
+ Supported versions: CVP 2021.2.0 or newer and CVaaS.
+ Returns:
+ response (dict): A dict that contains a list of all Change Control Approval Configs.
+ '''
+ msg = 'Change Control Resource APIs are supported from 2021.2.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.2.0+
+ if self.cvp_version_compare('>=', 6.0, msg):
+ cc_url = '/api/resources/changecontrol/v1/ApproveConfig/all'
+ self.log.debug('v6 {}'.format(cc_url))
+ return self.clnt.get(cc_url, timeout=self.request_timeout)
+ def change_control_approve(self, cc_id, notes="", approve=True):
+ ''' Approve/Unapprove a change control using Resource APIs.
+ Supported versions: CVP 2021.2.0 or newer and CVaaS.
+ Args:
+ cc_id (str): The ID of the change control.
+ notes (str): An optional approval note.
+ approve (bool): Set to True to approve a change and to False to unapprove a change.
+ The default is True.
+ '''
+ cc_url = '/api/resources/changecontrol/v1/ApproveConfig'
+ # For on-prem check the version as it is only supported from 2021.2.0+
+ # Since the get_change_control already checks this, no need to check it again
+ cc_status = self.change_control_get_one(cc_id)
+ if cc_status is None:
+ return None
+ if ('value' in cc_status and 'change' in cc_status['value'] and
+ 'time' in cc_status['value']['change']):
+ version = cc_status['value']['change']['time']
+ else:
+ self.log.error('The version timestamp was not found in the CC status.')
+ return None
+ payload = {
+ "key": {
+ "id": cc_id
+ },
+ "approve": {
+ "value": approve,
+ "notes": notes
+ },
+ "version": version
+ }
+ return, data=payload, timeout=self.request_timeout)
+ def change_control_delete(self, cc_id):
+ ''' Delete a pending Change Control using Resource APIs.
+ Supported versions: CVP 2021.2.0 or newer and CVaaS.
+ Args:
+ cc_id (str): The ID of the change control.
+ '''
+ msg = 'Change Control Resource APIs are supported from 2021.2.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.2.0+
+ if self.cvp_version_compare('>=', 6.0, msg):
+ params = '{}'.format(cc_id)
+ cc_url = '/api/resources/changecontrol/v1/ChangeControlConfig?' + params
+ self.log.debug('v6 {}'.format(cc_url))
+ return self.clnt.delete(cc_url, timeout=self.request_timeout)
+ def change_control_create_with_custom_stages(self, custom_cc=None):
+ ''' Create a Change Control with custom stage hierarchy using Resource APIs.
+ Supported versions: CVP 2021.2.0 or newer and CVaaS.
+ Args:
+ custom_cc (dict): A dictionary with the entire stage hierarchy.
+ Ex1: {'key': {'id': '409b94d1-c0cb-4d74-8f88-89f66f13f109'},
+ 'change': {'name': 'Change_20211217_034338',
+ 'notes': 'cvprac CC',
+ 'rootStageId': 'root',
+ 'stages': {'values': {'root': {'name': 'root',
+ 'rows': {'values': [{'values': ['1-2']},
+ {'values': ['3']}]}},
+ '1-2': {'name': 'stages 1-2',
+ 'rows': {'values': [{'values': ['1ab']},
+ {'values': ['2']}]}},
+ '1a': {'action': {'args': {'values': {'TaskID': '1242'}},
+ 'name': 'task',
+ 'timeout': 3000},
+ 'name': 'stage 1a'},
+ '1ab': {'name': 'stage 1ab',
+ 'rows': {'values': [{'values': ['1a',
+ '1b']}]}},
+ '1b': {'action': {'args': {'values': {'TaskID': '1243'}},
+ 'timeout': 3000},
+ 'name': 'stage 1b'},
+ '2': {'action': {'args': {'values': {'TaskID': '1240'}},
+ 'name': 'task',
+ 'timeout': 3000},
+ 'name': 'stage 2'},
+ '3': {'action': {'args': {'values': {'TaskID': '1241'}},
+ 'name': 'task',
+ 'timeout': 3000},
+ 'name': 'stage 3'},
+ }}}}
+ The above would result in the following hierarchy:
+ root (series)
+ |- stages 1-2 (series)
+ | |- stage 1ab (parallel)
+ | | |- stage 1a
+ | | |- stage 1b
+ | |- stage 2
+ |- stage 3
+ Ex2 (MLAG ISSU):
+ {'key': {'id': 'PXs9cKimC'},
+ 'change': {'name': 'Change 20211217_040530',
+ 'notes': '',
+ 'rootStageId': 'root',
+ 'stages': {'values': { 'root': {'name': 'Change 20211217_040530 Root',
+ 'rows': {
+ 'values': [{'values': ['left-leafs']}],
+ }},
+ 'upgrade1': {'action': {
+ 'args': {'values': {'TaskID': '1242'}},
+ 'name': 'task',
+ 'timeout': 3000},
+ 'name': 'Image Upgrade'},
+ 'pre-mlag-check-l2': {'action': {
+ 'args': {
+ 'values': {
+ 'DeviceID': 'SN2'}},
+ 'name': 'mlaghealthcheck'},
+ 'name': 'Check MLAG Health'},
+ 'left-leafs': {'name': 'left-leafs',
+ 'rows': {
+ 'values': [{'values': ['leaf1']},
+ {'values': ['leaf2']}]}},
+ 'upgrade2': {'action': {'args': {
+ 'values': {'TaskID': '1243'}},
+ 'name': 'task',
+ 'timeout': 3000},
+ 'name': 'Image Upgrade'},
+ 'pre-mlag-check-l1': {'action': {
+ 'args': {
+ 'values': {
+ 'DeviceID': 'SN1'}},
+ 'name': 'mlaghealthcheck'},
+ 'name': 'Check MLAG Health'},
+ 'post-mlag-check-l2': {'action': {
+ 'args': {
+ 'values': {
+ 'DeviceID': 'SN1'}},
+ 'name': 'mlaghealthcheck'},
+ 'name': 'Check MLAG Health'},
+ 'leaf1': {'name': 'leaf1',
+ 'rows': {
+ 'values': [{'values': [
+ 'pre-mlag-check-l1']},
+ {'values': [
+ 'upgrade1']},
+ {'values': [
+ 'post-mlag-check-l1'],
+ }]}},
+ 'post-mlag-check-l1': {'action': {
+ 'args': {
+ 'values': {
+ 'DeviceID': 'SN2'}},
+ 'name': 'mlaghealthcheck'},
+ 'name': 'Check MLAG Health'},
+ 'leaf2': {'name': 'leaf2',
+ 'rows': {'values': [{'values': [
+ 'pre-mlag-check-l2']},
+ {'values': [
+ 'upgrade2']},
+ {'values': [
+ 'post-mlag-check-l2'],
+ }]}}}}}
+ }
+ The above would result in the following hierarchy:
+ root (series)
+ |- left-leafs (series)
+ |- leaf1 (series)
+ | |- pre-mlag-check-l1
+ | |- upgrade1
+ | |- post-mlag-check-l1
+ |- leaf2 (series)
+ |- pre-mlag-check-l1
+ |- upgrade1
+ |- post-mlag-check-l1
+ Returns:
+ response (dict): A dict that contains...
+ Ex: {'value': {'key': {'id':cc_id,
+ 'time': '...'}
+ '''
+ msg = 'Change Control Resource APIs are supported from 2021.2.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.2.0+
+ if self.cvp_version_compare('>=', 6.0, msg):
+ payload = custom_cc
+ cc_url = '/api/resources/changecontrol/v1/ChangeControlConfig'
+ self.log.debug('v6 ' + str(cc_url) + ' ' + str(payload))
+ return, data=payload)
+ def change_control_create_for_tasks(self, cc_id, name, tasks, series=True):
+ ''' Create a simple Change Control for tasks using Resource APIs.
+ Supported versions: CVP 2021.2.0 or newer and CVaaS.
+ This function will create a change with either all Task actions in series or parallel.
+ For custom stage hierarchy the change_control_create_with_custom_stages()
+ should be used.
+ Args:
+ cc_id (string): The ID for the new change control.
+ name (string): The name for the new change control.
+ tasks (list): A list of Task IDs as strings
+ Ex: ['10', '11', '12']
+ series (bool): A flag for running tasks in series or
+ in parallel. Defaults to True for running in series.
+ Returns:
+ response (dict): A dict that contains...
+ Ex: {'value': {'key': {'id':cc_id,
+ 'time': '...'}
+ '''
+ stages = {'values': {'root': {'name': 'root', 'rows': {'values': []}}}}
+ if series:
+ for index, task in enumerate(tasks):
+ stage_id = 'stage%d' % index
+ stages['values']['root']['rows']['values'].append({'values': [stage_id]})
+ stages['values'][stage_id] = {
+ 'action': {
+ 'args': {
+ 'values': {
+ 'TaskID': task,
+ },
+ },
+ 'name': 'task',
+ 'timeout': 3000,
+ },
+ 'name': stage_id,
+ }
+ else:
+ stages['values']['root']['rows']['values'].append({'values': []})
+ for index, task in enumerate(tasks):
+ stage_id = 'stage%d' % index
+ stages['values']['root']['rows']['values'][0]['values'].append(stage_id)
+ stages['values'][stage_id] = {
+ 'action': {
+ 'args': {
+ 'values': {
+ 'TaskID': task,
+ },
+ },
+ 'name': 'task',
+ 'timeout': 3000,
+ },
+ 'name': stage_id,
+ }
+ msg = 'Change Control Resource APIs are supported from 2021.2.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.2.0+
+ if self.cvp_version_compare('>=', 6.0, msg):
+ payload = {
+ 'key': {
+ 'id': cc_id
+ },
+ 'change': {
+ 'name': name,
+ 'rootStageId': 'root',
+ 'notes': 'randomString',
+ 'stages': stages
+ }
+ }
+ cc_url = '/api/resources/changecontrol/v1/ChangeControlConfig'
+ self.log.debug('v6 ' + str(cc_url) + ' ' + str(payload))
+ return, data=payload, timeout=self.request_timeout)
+ def change_control_start(self, cc_id, notes=""):
+ ''' Start a Change Control using Resource APIs.
+ Supported versions: CVP 2021.2.0 or newer and CVaaS.
+ Args:
+ cc_id (string): The ID for the new change control.
+ notes (string): An optional note.
+ Returns:
+ response (dict): A dict that contains...
+ Ex: {"value":{"key":{"id":cc_id}, "start":{"value":true, "notes":"note"}},
+ "time":"2021-12-14T21:02:21.830306071Z"}
+ '''
+ msg = 'Change Control Resource APIs are supported from 2021.2.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.2.0+
+ if self.cvp_version_compare('>=', 6.0, msg):
+ payload = {
+ "key": {
+ "id": cc_id
+ },
+ "start": {
+ "value": True,
+ "notes": notes
+ }
+ }
+ cc_url = '/api/resources/changecontrol/v1/ChangeControlConfig'
+ self.log.debug('v6 ' + str(cc_url) + ' ' + str(payload))
+ return, data=payload, timeout=self.request_timeout)
+ def change_control_stop(self, cc_id, notes=""):
+ ''' Stop a Change Control using Resource APIs.
+ Supported versions: CVP 2021.2.0 or newer and CVaaS.
+ Args:
+ cc_id (string): The ID for the new change control.
+ notes (string): An optional note.
+ Returns:
+ response (dict): A dict that contains...
+ Ex: {"value":{"key":{"id":cc_id}, "start":{"value":false, "notes":"note"}},
+ "time":"2021-12-14T21:02:21.830306071Z"}
+ '''
+ msg = 'Change Control Resource APIs are supported from 2021.2.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.2.0+
+ if self.cvp_version_compare('>=', 6.0, msg):
+ payload = {
+ "key": {
+ "id": cc_id
+ },
+ "start": {
+ "value": False,
+ "notes": notes
+ }
+ }
+ cc_url = '/api/resources/changecontrol/v1/ChangeControlConfig'
+ self.log.debug('v6 ' + str(cc_url) + ' ' + str(payload))
+ return, data=payload, timeout=self.request_timeout)
+ def change_control_schedule(self, cc_id, schedule_time, notes=""):
+ ''' Schedule a Change Control using Resource APIs.
+ Supported versions: CVP 2022.1.0 or newer and CVaaS.
+ Args:
+ cc_id (string): The ID for the new change control.
+ schedule_time (string): rfc3339 time format, e.g: 2021-12-23T02:07:00.0Z
+ notes (string): An optional note.
+ Returns:
+ response (dict): A dict that contains...
+ Ex: {"value":{"key":{"id":"5821c7c1-e276-4387-b60a"},
+ "schedule":{"value":"2021-12-23T02:07:00Z",
+ "notes":"CC schedule via curl"}},
+ "time":"2021-12-23T02:06:18.739965204Z"}
+ '''
+ msg = 'Change Control Scheduling via Resource APIs are supported from 2022.1.0 or newer.'
+ # For on-prem check the version as it is only supported from 2022.1.0+
+ if self.cvp_version_compare('>=', 8.0, msg):
+ payload = {
+ "key": {
+ "id": cc_id
+ },
+ "schedule": {
+ "value": schedule_time,
+ "notes": notes
+ }
+ }
+ cc_url = '/api/resources/changecontrol/v1/ChangeControlConfig'
+ self.log.debug('v8 ' + str(cc_url) + ' ' + str(payload))
+ return, data=payload, timeout=self.request_timeout)
+ def device_decommissioning(self, device_id, request_id):
+ ''' Decommission a device using Resource APIs.
+ Supported versions: CVP 2021.3.0 or newer and CVaaS.
+ Args:
+ device_id (string): Serial Number of the device.
+ request_id (string): Key identifies the request to decommission the device.
+ Recommended to generate uuid with str(uuid.uuid4()).
+ Returns:
+ response (dict): Returns None if the device is not found else returns A dict that contains...
+ Ex: {'value': {'key': {'requestId': '4a4ba5a2-9886-4cd5-84d6-bdaf85a9f091'},
+ 'deviceId': 'BAD032986065E8DC14CBB6472EC314A6'},
+ 'time': '2022-02-12T02:58:30.765459650Z'}
+ '''
+ device_info = self.get_device_by_serial(device_id)
+ if device_info is not None and 'serialNumber' in device_info:
+ msg = 'Decommissioning via Resource APIs are supported from 2021.3.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.3.0+
+ if self.cvp_version_compare('>=', 7.0, msg):
+ payload = {
+ "key": {
+ "request_id": request_id
+ },
+ "device_id": device_id
+ }
+ url = '/api/resources/inventory/v1/DeviceDecommissioningConfig'
+ self.log.debug('v7 ' + str(url) + ' ' + str(payload))
+ return, data=payload, timeout=self.request_timeout)
+ else:
+ self.log.warning(
+ 'Device with %s serial number does not exist (or is not registered) to decommission'
+ % device_id)
+ return None
+ def device_decommissioning_status_get_one(self, request_id):
+ ''' Get the decommission status of a device using Resource APIs.
+ Supported versions: CVP 2021.3.0 or newer and CVaaS.
+ Args:
+ request_id (string): key identifies the request to decommission the device
+ Returns:
+ response (dict): A dict that contains...
+ Ex:{"result":{"value":{"key":{"requestId":"123456789"},
+ "statusMessage":"Disabled TerminAttr, waiting for device to be marked inactive"},
+ "time":"2022-02-04T19:41:46.376310308Z","type":"INITIAL"}}
+ '''
+ msg = 'Decommissioning via Resource APIs are supported from 2021.3.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.3.0+
+ if self.cvp_version_compare('>=', 7.0, msg):
+ params = 'key.requestId={}'.format(request_id)
+ url = '/api/resources/inventory/v1/DeviceDecommissioning?' + params
+ self.log.debug('v7 ' + str(url))
+ return self.clnt.get(url, timeout=self.request_timeout)
+ def device_decommissioning_status_get_all(self, status="DECOMMISSIONING_STATUS_UNSPECIFIED"):
+ ''' Get the decommissioning status of all devices using Resource APIs.
+ Supported versions: CVP 2021.3.0 or newer and CVaaS.
+ Args:
+ status (enum): By default it will get the decommissioning status for all devices.
+ Possible values:
+ Returns:
+ response (dict): A dict that contains...
+ Ex: {"result":{"value":{"key":{"requestId":"123456789"},
+ "statusMessage":"Disabled TerminAttr, waiting for device to be marked inactive"},
+ "time":"2022-02-04T19:41:46.376310308Z","type":"INITIAL"}}
+ '''
+ msg = 'Decommissioning via Resource APIs are supported from 2021.3.0 or newer.'
+ # For on-prem check the version as it is only supported from 2021.3.0+
+ if self.cvp_version_compare('>=', 7.0, msg):
+ payload = {
+ "partialEqFilter": [
+ {
+ "status": status,
+ }
+ ]
+ }
+ url = '/api/resources/inventory/v1/DeviceDecommissioning/all'
+ self.log.debug('v7 ' + str(url))
+ return, data=payload, timeout=self.request_timeout)
+ def add_role(self, name, description, moduleList):
+ ''' Add new local role to the CVP UI.
+ Args:
+ name (str): local role name on CVP
+ description (str): role description
+ moduleList (list): list of modules (name (str) and mode (str))
+ '''
+ data = {"name": name,
+ "description": description,
+ "moduleList": moduleList}
+ return'/role/', data=data,
+ timeout=self.request_timeout)
+ def update_role(self, rolekey, name, description, moduleList):
+ ''' Updates role information, like
+ role name, description and role modules.
+ Args:
+ rolekey (str): local role key on CVP
+ name (str): local role name on CVP
+ description (str): role description
+ moduleList (list): list of modules (name (str) and mode (str))
+ '''
+ data = {"key": rolekey,
+ "name": name,
+ "description": description,
+ "moduleList": moduleList}
+ return'/role/', data=data,
+ timeout=self.request_timeout)
+ def get_role(self, rolekey):
+ ''' Returns specified role information.
+ Args:
+ rolekey (str): role key on CVP
+ Returns:
+ response (dict): Returns a dict that contains the role.
+ Ex: {'name': 'Test Role', 'key': 'role_1599019487020581247', 'description': 'Test'...}
+ '''
+ return self.clnt.get('/role/{}'.format(rolekey),
+ timeout=self.request_timeout)
+ def get_roles(self):
+ ''' Get all the user roles in CloudVision.
+ Returns:
+ response (dict): Returns a dict that contains all the user role states..
+ Ex: {'total': 7, 'roles': [{'name': 'Test Role', 'key': 'role_1599019487020581247',
+ 'description': 'Test'...}]}
+ '''
+ url = '/role/'
+ return self.clnt.get(url, timeout=self.request_timeout)
+ def delete_role(self, rolekey):
+ ''' Remove specified role from CVP
+ Args:
+ rolekey (str): role key on CVP
+ '''
+ data = [rolekey]
+ return self.delete_roles(data)
+ def delete_roles(self, rolekeys):
+ ''' Remove specified roles from CVP
+ Args:
+ rolekeys (list): list of role keys (str) on CVP
+ '''
+ return'/role/', data=rolekeys,
+ timeout=self.request_timeout)
+ def svc_account_token_get_all(self):
+ ''' Get all service account token states using Resource APIs.
+ Supported versions: CVP 2021.3.0 or newer and CVaaS.
+ Returns:
+ response (list): Returns a list of dictionaries that contains...
+ Ex: [{'value': {'key': {'id': 'randomId'}, 'user': 'string',
+ 'description': 'string','valid_until': '2022-11-02T06:58:53Z',
+ 'created_by': 'string', 'last_used': None},
+ 'time': '2022-05-03T15:38:53.725014447Z', 'type': 'INITIAL'}, ...]
+ '''
+ msg = 'Service Account Resource APIs are supported from 2021.3.0+.'
+ if self.cvp_version_compare('>=', 7.0, msg):
+ url = '/api/v3/services/arista.serviceaccount.v1.TokenService/GetAll'
+ self.log.debug('v7 {}'.format(url))
+ return
+ def svc_account_token_get_one(self, token_id):
+ ''' Get a service account token's state using Resource APIs
+ Supported versions: CVP 2021.3.0 or newer and CVaaS.
+ Returns:
+ response (list): Returns a list of dict that contains...
+ Ex: [{'value': {'key': {'id': 'randomId'}, 'user': 'string',
+ 'description': 'string', 'valid_until': '2022-11-02T06:58:53Z',
+ 'created_by': 'cvpadmin', 'last_used': None},
+ 'time': '2022-05-03T15:38:53.725014447Z', 'type': 'INITIAL'}]
+ '''
+ msg = 'Service Account Resource APIs are supported from 2021.3.0+.'
+ if self.cvp_version_compare('>=', 7.0, msg):
+ payload = {"key": {"id": token_id}}
+ url = '/api/v3/services/arista.serviceaccount.v1.TokenService/GetOne'
+ self.log.debug('v7 {} {}'.format(url, payload))
+ return, data=payload)
+ def svc_account_token_delete(self, token_id):
+ ''' Delete a service account token using Resource APIs.
+ Supported versions: CVP 2021.3.0 or newer and CVaaS.
+ Args:
+ token_id (string): The id of the service account token.
+ Returns:
+ response (list): Returns a list of dict that contains the time of deletion:
+ Ex: [{'key': {'id': '<token_id>'},
+ 'time': '2022-07-26T15:29:03.687167871Z'}]
+ '''
+ msg = 'Service Account Resource APIs are supported from 2021.3.0+.'
+ if self.cvp_version_compare('>=', 7.0, msg):
+ payload = {"key": {"id": token_id}}
+ url = '/api/v3/services/arista.serviceaccount.v1.TokenConfigService/Delete'
+ self.log.debug('v7 {} {}'.format(url, payload))
+ return, data=payload)
+ def svc_account_token_set(self, username, duration, description):
+ ''' Create a service account token using Resource APIs.
+ Supported versions: CVP 2021.3.0 or newer and CVaaS.
+ Args:
+ username (string): The service account username for which the token will be
+ generated.
+ duration (string): The validity of the service account in seconds e.g.: "20000s"
+ The maximum value is 1 year in seconds e.g.: "31536000s"
+ description (string): The description of the service account token.
+ Returns:
+ response (list): Returns a list of dict that contains the token:
+ Ex: [{'value': {'key': {'id': '<userId>'}, 'user': 'ansible',
+ 'description': 'cvprac test',
+ 'valid_for': '550s', 'token': '<ey...>'}]
+ '''
+ payload = {'value': {'description': description,
+ 'user': username,
+ 'valid_for': duration}}
+ msg = 'Service Account Resource APIs are supported from 2021.3.0+.'
+ if self.cvp_version_compare('>=', 7.0, msg):
+ url = '/api/v3/services/arista.serviceaccount.v1.TokenConfigService/Set'
+ self.log.debug('v7 {} {}'.format(url, payload))
+ return, data=payload)
+ def svc_account_get_all(self):
+ ''' Get all service account states using Resource APIs.
+ Supported versions: CVP 2021.3.0 or newer and CVaaS.
+ Returns:
+ response (list): Returns a list of dictionaries that contains...
+ Ex: [{'value': {'key': {'name': 'ansible'}, 'status': 'ACCOUNT_STATUS_ENABLED',
+ 'description': 'lab-tests', 'groups': {'values': ['network-admin']}},
+ 'time': '2022-02-10T04:28:14.251684869Z', 'type': 'INITIAL'}, ...]
+ '''
+ msg = 'Service Account Resource APIs are supported from 2021.3.0+.'
+ if self.cvp_version_compare('>=', 7.0, msg):
+ url = '/api/v3/services/arista.serviceaccount.v1.AccountService/GetAll'
+ self.log.debug('v7 {} '.format(url))
+ return
+ def svc_account_get_one(self, username):
+ ''' Get a service account's state using Resource APIs
+ Supported versions: CVP 2021.3.0 or newer and CVaaS.
+ Args:
+ username (string): The service account username.
+ Returns:
+ response (list): Returns a list of dict that contains...
+ Ex: [{'value': {'key': {'name': 'ansible'}, 'status': 'ACCOUNT_STATUS_ENABLED',
+ 'description': 'lab-tests', 'groups': {'values': ['network-admin']}},
+ 'time': '2022-02-10T04:28:14.251684869Z'}]
+ '''
+ msg = 'Service Account Resource APIs are supported from 2021.3.0+.'
+ if self.cvp_version_compare('>=', 7.0, msg):
+ payload = {"key": {"name": username}}
+ url = '/api/v3/services/arista.serviceaccount.v1.AccountService/GetOne'
+ self.log.debug('v7 {} {}'.format(url, payload))
+ return, data=payload)
+ def svc_account_set(self, username, description, roles, status):
+ ''' Create a service account using Resource APIs.
+ Supported versions: CVP 2021.3.0 or newer and CVaaS.
+ Args:
+ username (string): The service account username.
+ description (string): The description of the service account.
+ roles (list): The list of role IDs. Default roles have a human readable name,
+ e.g.: 'network-admin', 'network-operator';
+ other roles will have the format of 'role_<unix_timestamp>',
+ e.g. 'role_1658850344592739349'.
+ cvprac automatically converts non-default role names to role IDs.
+ status (enum): The status of the service account. Possible values:
+ Returns:
+ response (list): Returns a list of dict that contains...
+ Ex: [{'value': {'key': {'name': 'cvprac2'}, 'status': 'ACCOUNT_STATUS_ENABLED',
+ 'description': 'testapi', 'groups': {'values':
+ ['network-admin', 'role_1658850344592739349']}},
+ 'time': '2022-07-26T18:19:55.392173445Z'}]
+ '''
+ msg = 'Service Account Resource APIs are supported from 2021.3.0+.'
+ if self.cvp_version_compare('>=', 7.0, msg):
+ role_ids = []
+ all_roles = self.get_roles()
+ for role in all_roles['roles']:
+ if role['key'] in roles or role['name'] in roles:
+ role_ids.append(role['key'])
+ if len(roles) != len(role_ids):
+ self.log.warning('Not all provided roles {} are valid. '
+ 'Only using the found valid roles {}'.format(roles, role_ids))
+ payload = {'value': {'description': description,
+ 'groups': {'values': role_ids},
+ 'key': {'name': username},
+ 'status': status}}
+ url = '/api/v3/services/arista.serviceaccount.v1.AccountConfigService/Set'
+ self.log.debug('v7 {} {}'.format(url, payload))
+ return, data=payload)
+ def svc_account_delete(self, username):
+ ''' Delete a service account using Resource APIs.
+ Supported versions: CVP 2021.3.0 or newer and CVaaS.
+ Args:
+ username (string): The service account username.
+ Returns:
+ response (list): Returns a list of dict that contains the time of deletion:
+ Ex: [{'key': {'name': 'cvprac2'},
+ 'time': '2022-07-26T18:26:53.637425846Z'}]
+ '''
+ msg = 'Service Account Resource APIs are supported from 2021.3.0+.'
+ if self.cvp_version_compare('>=', 7.0, msg):
+ payload = {"key": {"name": username}}
+ url = '/api/v3/services/arista.serviceaccount.v1.AccountConfigService/Delete'
+ self.log.debug('v7 {} {}'.format(url, payload))
+ return, data=payload)
+ def svc_account_delete_expired_tokens(self):
+ ''' Delete all service account tokens using Resource APIs.
+ Supported versions: CVP 2021.3.0 or newer and CVaaS.
+ Returns:
+ response (list): Returns a list of dict that contains the list of tokens
+ that were deleted:
+ Ex: [{'value': {'key': {'id': '091f48a2808'},'user': 'cvprac3',
+ 'description': 'cvprac test', 'valid_until': '2022-07-26T18:31:18Z',
+ 'created_by': 'cvpadmin', 'last_used': None},
+ 'time': '2022-07-26T18:30:28.022504853Z','type': 'INITIAL'},
+ {'value': {'key': {'id': '2f6325d9c'},...]
+ '''
+ tokens = self.svc_account_token_get_all()
+ expired_tokens = []
+ for tok in tokens:
+ token = tok['value']
+ if datetime.strptime(token['valid_until'], "%Y-%m-%dT%H:%M:%SZ") < datetime.utcnow():
+ self.svc_account_token_delete(token['key']['id'])
+ expired_tokens.append(tok)
+ return expired_tokens