summaryrefslogtreecommitdiffstats
path: root/eos_downloader
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-05-11 09:25:01 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-06-12 05:31:22 +0000
commit7ad1d0e0af695fa7f872b740a1bb7b2897eb41bd (patch)
tree13dd59a8ea98206a8c56ffd466f59c146f9f19c7 /eos_downloader
parentInitial commit. (diff)
downloadeos-downloader-7ad1d0e0af695fa7f872b740a1bb7b2897eb41bd.tar.xz
eos-downloader-7ad1d0e0af695fa7f872b740a1bb7b2897eb41bd.zip
Adding upstream version 0.8.1.upstream/0.8.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'eos_downloader')
-rw-r--r--eos_downloader/__init__.py47
-rw-r--r--eos_downloader/cli/__init__.py0
-rw-r--r--eos_downloader/cli/cli.py76
-rw-r--r--eos_downloader/cli/debug/__init__.py0
-rw-r--r--eos_downloader/cli/debug/commands.py53
-rw-r--r--eos_downloader/cli/get/__init__.py0
-rw-r--r--eos_downloader/cli/get/commands.py137
-rw-r--r--eos_downloader/cli/info/__init__.py0
-rw-r--r--eos_downloader/cli/info/commands.py87
-rw-r--r--eos_downloader/cvp.py276
-rw-r--r--eos_downloader/data.py93
-rw-r--r--eos_downloader/download.py77
-rw-r--r--eos_downloader/eos.py177
-rw-r--r--eos_downloader/models/__init__.py0
-rw-r--r--eos_downloader/models/version.py272
-rw-r--r--eos_downloader/object_downloader.py513
-rw-r--r--eos_downloader/tools.py13
17 files changed, 1821 insertions, 0 deletions
diff --git a/eos_downloader/__init__.py b/eos_downloader/__init__.py
new file mode 100644
index 0000000..345ccf7
--- /dev/null
+++ b/eos_downloader/__init__.py
@@ -0,0 +1,47 @@
+#!/usr/bin/python
+# coding: utf-8 -*-
+
+"""
+EOS Downloader module.
+"""
+
+from __future__ import (absolute_import, division,
+ print_function, unicode_literals, annotations)
+import dataclasses
+from typing import Any
+import json
+import importlib.metadata
+
+__author__ = '@titom73'
+__email__ = 'tom@inetsix.net'
+__date__ = '2022-03-16'
+__version__ = importlib.metadata.version("eos-downloader")
+
+# __all__ = ["CvpAuthenticationItem", "CvFeatureManager", "EOSDownloader", "ObjectDownloader", "reverse"]
+
+ARISTA_GET_SESSION = "https://www.arista.com/custom_data/api/cvp/getSessionCode/"
+
+ARISTA_SOFTWARE_FOLDER_TREE = "https://www.arista.com/custom_data/api/cvp/getFolderTree/"
+
+ARISTA_DOWNLOAD_URL = "https://www.arista.com/custom_data/api/cvp/getDownloadLink/"
+
+MSG_TOKEN_EXPIRED = """The API token has expired. Please visit arista.com, click on your profile and
+select Regenerate Token then re-run the script with the new token.
+"""
+
+MSG_TOKEN_INVALID = """The API token is incorrect. Please visit arista.com, click on your profile and
+check the Access Token. Then re-run the script with the correct token.
+"""
+
+MSG_INVALID_DATA = """Invalid data returned by server
+"""
+
+EVE_QEMU_FOLDER_PATH = '/opt/unetlab/addons/qemu/'
+
+
+class EnhancedJSONEncoder(json.JSONEncoder):
+ """Custom JSon encoder."""
+ def default(self, o: Any) -> Any:
+ if dataclasses.is_dataclass(o):
+ return dataclasses.asdict(o)
+ return super().default(o)
diff --git a/eos_downloader/cli/__init__.py b/eos_downloader/cli/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/eos_downloader/cli/__init__.py
diff --git a/eos_downloader/cli/cli.py b/eos_downloader/cli/cli.py
new file mode 100644
index 0000000..ddd0dea
--- /dev/null
+++ b/eos_downloader/cli/cli.py
@@ -0,0 +1,76 @@
+#!/usr/bin/env python
+# coding: utf-8 -*-
+# pylint: disable=no-value-for-parameter
+# pylint: disable=cyclic-import
+# pylint: disable=too-many-arguments
+# pylint: disable=unused-argument
+
+
+"""
+ARDL CLI Baseline.
+"""
+
+import click
+from rich.console import Console
+import eos_downloader
+from eos_downloader.cli.get import commands as get_commands
+from eos_downloader.cli.debug import commands as debug_commands
+from eos_downloader.cli.info import commands as info_commands
+
+
+@click.group()
+@click.pass_context
+@click.option('--token', show_envvar=True, default=None, help='Arista Token from your customer account')
+def ardl(ctx: click.Context, token: str) -> None:
+ """Arista Network Download CLI"""
+ ctx.ensure_object(dict)
+ ctx.obj['token'] = token
+
+
+@click.command()
+def version() -> None:
+ """Display version of ardl"""
+ console = Console()
+ console.print(f'ardl is running version {eos_downloader.__version__}')
+
+
+@ardl.group(no_args_is_help=True)
+@click.pass_context
+def get(ctx: click.Context) -> None:
+ # pylint: disable=redefined-builtin
+ """Download Arista from Arista website"""
+
+
+@ardl.group(no_args_is_help=True)
+@click.pass_context
+def info(ctx: click.Context) -> None:
+ # pylint: disable=redefined-builtin
+ """List information from Arista website"""
+
+
+@ardl.group(no_args_is_help=True)
+@click.pass_context
+def debug(ctx: click.Context) -> None:
+ # pylint: disable=redefined-builtin
+ """Debug commands to work with ardl"""
+
+# ANTA CLI Execution
+
+
+def cli() -> None:
+ """Load ANTA CLI"""
+ # Load group commands
+ get.add_command(get_commands.eos)
+ get.add_command(get_commands.cvp)
+ info.add_command(info_commands.eos_versions)
+ debug.add_command(debug_commands.xml)
+ ardl.add_command(version)
+ # Load CLI
+ ardl(
+ obj={},
+ auto_envvar_prefix='arista'
+ )
+
+
+if __name__ == '__main__':
+ cli()
diff --git a/eos_downloader/cli/debug/__init__.py b/eos_downloader/cli/debug/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/eos_downloader/cli/debug/__init__.py
diff --git a/eos_downloader/cli/debug/commands.py b/eos_downloader/cli/debug/commands.py
new file mode 100644
index 0000000..107b8a0
--- /dev/null
+++ b/eos_downloader/cli/debug/commands.py
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+# coding: utf-8 -*-
+# pylint: disable=no-value-for-parameter
+# pylint: disable=too-many-arguments
+# pylint: disable=line-too-long
+# pylint: disable=duplicate-code
+# flake8: noqa E501
+
+"""
+Commands for ARDL CLI to get data.
+"""
+
+import xml.etree.ElementTree as ET
+from xml.dom import minidom
+
+import click
+from loguru import logger
+from rich.console import Console
+
+import eos_downloader.eos
+
+
+@click.command()
+@click.pass_context
+@click.option('--output', default=str('arista.xml'), help='Path to save XML file', type=click.Path(), show_default=True)
+@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False))
+def xml(ctx: click.Context, output: str, log_level: str) -> None:
+ # sourcery skip: remove-unnecessary-cast
+ """Extract XML directory structure"""
+ console = Console()
+ # Get from Context
+ token = ctx.obj['token']
+
+ logger.remove()
+ if log_level is not None:
+ logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
+
+ my_download = eos_downloader.eos.EOSDownloader(
+ image='unset',
+ software='EOS',
+ version='unset',
+ token=token,
+ hash_method='sha512sum')
+
+ my_download.authenticate()
+ xml_object: ET.ElementTree = my_download._get_folder_tree() # pylint: disable=protected-access
+ xml_content = xml_object.getroot()
+
+ xmlstr = minidom.parseString(ET.tostring(xml_content)).toprettyxml(indent=" ", newl='')
+ with open(output, "w", encoding='utf-8') as f:
+ f.write(str(xmlstr))
+
+ console.print(f'XML file saved in: { output }')
diff --git a/eos_downloader/cli/get/__init__.py b/eos_downloader/cli/get/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/eos_downloader/cli/get/__init__.py
diff --git a/eos_downloader/cli/get/commands.py b/eos_downloader/cli/get/commands.py
new file mode 100644
index 0000000..13a8eec
--- /dev/null
+++ b/eos_downloader/cli/get/commands.py
@@ -0,0 +1,137 @@
+#!/usr/bin/env python
+# coding: utf-8 -*-
+# pylint: disable=no-value-for-parameter
+# pylint: disable=too-many-arguments
+# pylint: disable=line-too-long
+# pylint: disable=redefined-builtin
+# flake8: noqa E501
+
+"""
+Commands for ARDL CLI to get data.
+"""
+
+import os
+import sys
+from typing import Union
+
+import click
+from loguru import logger
+from rich.console import Console
+
+import eos_downloader.eos
+from eos_downloader.models.version import BASE_VERSION_STR, RTYPE_FEATURE, RTYPES
+
+EOS_IMAGE_TYPE = ['64', 'INT', '2GB-INT', 'cEOS', 'cEOS64', 'vEOS', 'vEOS-lab', 'EOS-2GB', 'default']
+CVP_IMAGE_TYPE = ['ova', 'rpm', 'kvm', 'upgrade']
+
+@click.command(no_args_is_help=True)
+@click.pass_context
+@click.option('--image-type', default='default', help='EOS Image type', type=click.Choice(EOS_IMAGE_TYPE), required=True)
+@click.option('--version', default=None, help='EOS version', type=str, required=False)
+@click.option('--latest', '-l', is_flag=True, type=click.BOOL, default=False, help='Get latest version in given branch. If --branch is not use, get the latest branch with specific release type')
+@click.option('--release-type', '-rtype', type=click.Choice(RTYPES, case_sensitive=False), default=RTYPE_FEATURE, help='EOS release type to search')
+@click.option('--branch', '-b', type=click.STRING, default=None, help='EOS Branch to list releases')
+@click.option('--docker-name', default='arista/ceos', help='Docker image name (default: arista/ceos)', type=str, show_default=True)
+@click.option('--output', default=str(os.path.relpath(os.getcwd(), start=os.curdir)), help='Path to save image', type=click.Path(),show_default=True)
+# Debugging
+@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False))
+# Boolean triggers
+@click.option('--eve-ng', is_flag=True, help='Run EVE-NG vEOS provisioning (only if CLI runs on an EVE-NG server)', default=False)
+@click.option('--disable-ztp', is_flag=True, help='Disable ZTP process in vEOS image (only available with --eve-ng)', default=False)
+@click.option('--import-docker', is_flag=True, help='Import docker image (only available with --image_type cEOSlab)', default=False)
+def eos(
+ ctx: click.Context, image_type: str, output: str, log_level: str, eve_ng: bool, disable_ztp: bool,
+ import_docker: bool, docker_name: str, version: Union[str, None] = None, release_type: str = RTYPE_FEATURE,
+ latest: bool = False, branch: Union[str,None] = None
+ ) -> int:
+ """Download EOS image from Arista website"""
+ console = Console()
+ # Get from Context
+ token = ctx.obj['token']
+ if token is None or token == '':
+ console.print('❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red")
+ sys.exit(1)
+
+ logger.remove()
+ if log_level is not None:
+ logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
+
+ console.print("🪐 [bold blue]eos-downloader[/bold blue] is starting...", )
+ console.print(f' - Image Type: {image_type}')
+ console.print(f' - Version: {version}')
+
+
+ if version is not None:
+ my_download = eos_downloader.eos.EOSDownloader(
+ image=image_type,
+ software='EOS',
+ version=version,
+ token=token,
+ hash_method='sha512sum')
+ my_download.authenticate()
+
+ elif latest:
+ my_download = eos_downloader.eos.EOSDownloader(
+ image=image_type,
+ software='EOS',
+ version='unset',
+ token=token,
+ hash_method='sha512sum')
+ my_download.authenticate()
+ if branch is None:
+ branch = str(my_download.latest_branch(rtype=release_type).branch)
+ latest_version = my_download.latest_eos(branch, rtype=release_type)
+ if str(latest_version) == BASE_VERSION_STR:
+ console.print(f'[red]Error[/red], cannot find any version in {branch} for {release_type} release type')
+ sys.exit(1)
+ my_download.version = str(latest_version)
+
+ if eve_ng:
+ my_download.provision_eve(noztp=disable_ztp, checksum=True)
+ else:
+ my_download.download_local(file_path=output, checksum=True)
+
+ if import_docker:
+ my_download.docker_import(
+ image_name=docker_name
+ )
+ console.print('✅ processing done !')
+ sys.exit(0)
+
+
+
+@click.command(no_args_is_help=True)
+@click.pass_context
+@click.option('--format', default='upgrade', help='CVP Image type', type=click.Choice(CVP_IMAGE_TYPE), required=True)
+@click.option('--version', default=None, help='CVP version', type=str, required=True)
+@click.option('--output', default=str(os.path.relpath(os.getcwd(), start=os.curdir)), help='Path to save image', type=click.Path(),show_default=True)
+@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False))
+def cvp(ctx: click.Context, version: str, format: str, output: str, log_level: str) -> int:
+ """Download CVP image from Arista website"""
+ console = Console()
+ # Get from Context
+ token = ctx.obj['token']
+ if token is None or token == '':
+ console.print('❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red")
+ sys.exit(1)
+
+ logger.remove()
+ if log_level is not None:
+ logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
+
+ console.print("🪐 [bold blue]eos-downloader[/bold blue] is starting...", )
+ console.print(f' - Image Type: {format}')
+ console.print(f' - Version: {version}')
+
+ my_download = eos_downloader.eos.EOSDownloader(
+ image=format,
+ software='CloudVision',
+ version=version,
+ token=token,
+ hash_method='md5sum')
+
+ my_download.authenticate()
+
+ my_download.download_local(file_path=output, checksum=False)
+ console.print('✅ processing done !')
+ sys.exit(0)
diff --git a/eos_downloader/cli/info/__init__.py b/eos_downloader/cli/info/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/eos_downloader/cli/info/__init__.py
diff --git a/eos_downloader/cli/info/commands.py b/eos_downloader/cli/info/commands.py
new file mode 100644
index 0000000..b51003b
--- /dev/null
+++ b/eos_downloader/cli/info/commands.py
@@ -0,0 +1,87 @@
+#!/usr/bin/env python
+# coding: utf-8 -*-
+# pylint: disable=no-value-for-parameter
+# pylint: disable=too-many-arguments
+# pylint: disable=line-too-long
+# pylint: disable=redefined-builtin
+# flake8: noqa E501
+
+"""
+Commands for ARDL CLI to list data.
+"""
+
+import sys
+from typing import Union
+
+import click
+from loguru import logger
+from rich.console import Console
+from rich.pretty import pprint
+
+import eos_downloader.eos
+from eos_downloader.models.version import BASE_VERSION_STR, RTYPE_FEATURE, RTYPES
+
+
+@click.command(no_args_is_help=True)
+@click.pass_context
+@click.option('--latest', '-l', is_flag=True, type=click.BOOL, default=False, help='Get latest version in given branch. If --branch is not use, get the latest branch with specific release type')
+@click.option('--release-type', '-rtype', type=click.Choice(RTYPES, case_sensitive=False), default=RTYPE_FEATURE, help='EOS release type to search')
+@click.option('--branch', '-b', type=click.STRING, default=None, help='EOS Branch to list releases')
+@click.option('--verbose', '-v', is_flag=True, type=click.BOOL, default=False, help='Human readable output. Default is none to use output in script)')
+@click.option('--log-level', '--log', help='Logging level of the command', default='warning', type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False))
+def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = None, release_type: str = RTYPE_FEATURE, latest: bool = False, verbose: bool = False) -> None:
+ # pylint: disable = too-many-branches
+ """
+ List Available EOS version on Arista.com website.
+
+ Comes with some filters to get latest release (F or M) as well as branch filtering
+
+ - To get latest M release available (without any branch): ardl info eos-versions --latest -rtype m
+
+ - To get latest F release available: ardl info eos-versions --latest -rtype F
+ """
+ console = Console()
+ # Get from Context
+ token = ctx.obj['token']
+
+ logger.remove()
+ if log_level is not None:
+ logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
+
+ my_download = eos_downloader.eos.EOSDownloader(
+ image='unset',
+ software='EOS',
+ version='unset',
+ token=token,
+ hash_method='sha512sum')
+
+ auth = my_download.authenticate()
+ if verbose and auth:
+ console.print('✅ Authenticated on arista.com')
+
+ if release_type is not None:
+ release_type = release_type.upper()
+
+ if latest:
+ if branch is None:
+ branch = str(my_download.latest_branch(rtype=release_type).branch)
+ latest_version = my_download.latest_eos(branch, rtype=release_type)
+ if str(latest_version) == BASE_VERSION_STR:
+ console.print(f'[red]Error[/red], cannot find any version in {branch} for {release_type} release type')
+ sys.exit(1)
+ if verbose:
+ console.print(f'Branch {branch} has been selected with release type {release_type}')
+ if branch is not None:
+ console.print(f'Latest release for {branch}: {latest_version}')
+ else:
+ console.print(f'Latest EOS release: {latest_version}')
+ else:
+ console.print(f'{ latest_version }')
+ else:
+ versions = my_download.get_eos_versions(branch=branch, rtype=release_type)
+ if verbose:
+ console.print(f'List of available versions for {branch if branch is not None else "all branches"}')
+ for version in versions:
+ console.print(f' → {str(version)}')
+ else:
+ pprint([str(version) for version in versions])
diff --git a/eos_downloader/cvp.py b/eos_downloader/cvp.py
new file mode 100644
index 0000000..6f14eb0
--- /dev/null
+++ b/eos_downloader/cvp.py
@@ -0,0 +1,276 @@
+#!/usr/bin/python
+# coding: utf-8 -*-
+
+"""
+CVP Uploader content
+"""
+
+import os
+from typing import List, Optional, Any
+from dataclasses import dataclass
+from loguru import logger
+from cvprac.cvp_client import CvpClient
+from cvprac.cvp_client_errors import CvpLoginError
+
+# from eos_downloader.tools import exc_to_str
+
+# logger = logging.getLogger(__name__)
+
+
+@dataclass
+class CvpAuthenticationItem:
+ """
+ Data structure to represent Cloudvision Authentication
+ """
+ server: str
+ port: int = 443
+ token: Optional[str] = None
+ timeout: int = 1200
+ validate_cert: bool = False
+
+
+class Filer():
+ # pylint: disable=too-few-public-methods
+ """
+ Filer Helper for file management
+ """
+ def __init__(self, path: str) -> None:
+ self.file_exist = False
+ self.filename = ''
+ self.absolute_path = ''
+ self.relative_path = path
+ if os.path.exists(path):
+ self.file_exist = True
+ self.filename = os.path.basename(path)
+ self.absolute_path = os.path.realpath(path)
+
+ def __repr__(self) -> str:
+ return self.absolute_path if self.file_exist else ''
+
+
+class CvFeatureManager():
+ """
+ CvFeatureManager Object to interect with Cloudvision
+ """
+ def __init__(self, authentication: CvpAuthenticationItem) -> None:
+ """
+ __init__ Class Creator
+
+ Parameters
+ ----------
+ authentication : CvpAuthenticationItem
+ Authentication information to use to connect to Cloudvision
+ """
+ self._authentication = authentication
+ # self._cv_instance = CvpClient()
+ self._cv_instance = self._connect(authentication=authentication)
+ self._cv_images = self.__get_images()
+ # self._cv_bundles = self.__get_bundles()
+
+ def _connect(self, authentication: CvpAuthenticationItem) -> CvpClient:
+ """
+ _connect Connection management
+
+ Parameters
+ ----------
+ authentication : CvpAuthenticationItem
+ Authentication information to use to connect to Cloudvision
+
+ Returns
+ -------
+ CvpClient
+ cvprac session to cloudvision
+ """
+ client = CvpClient()
+ if authentication.token is not None:
+ try:
+ client.connect(
+ nodes=[authentication.server],
+ username='',
+ password='',
+ api_token=authentication.token,
+ is_cvaas=True,
+ port=authentication.port,
+ cert=authentication.validate_cert,
+ request_timeout=authentication.timeout
+ )
+ except CvpLoginError as error_data:
+ logger.error(f'Cannot connect to Cloudvision server {authentication.server}')
+ logger.debug(f'Error message: {error_data}')
+ logger.info('connected to Cloudvision server')
+ logger.debug(f'Connection info: {authentication}')
+ return client
+
+ def __get_images(self) -> List[Any]:
+ """
+ __get_images Collect information about images on Cloudvision
+
+ Returns
+ -------
+ dict
+ Fact returned by Cloudvision
+ """
+ images = []
+ logger.debug(' -> Collecting images')
+ images = self._cv_instance.api.get_images()['data']
+ return images if self.__check_api_result(images) else []
+
+ # def __get_bundles(self):
+ # """
+ # __get_bundles [Not In use] Collect information about bundles on Cloudvision
+
+ # Returns
+ # -------
+ # dict
+ # Fact returned by Cloudvision
+ # """
+ # bundles = []
+ # logger.debug(' -> Collecting images bundles')
+ # bundles = self._cv_instance.api.get_image_bundles()['data']
+ # # bundles = self._cv_instance.post(url='/cvpservice/image/getImageBundles.do?queryparam=&startIndex=0&endIndex=0')['data']
+ # return bundles if self.__check_api_result(bundles) else None
+
+ def __check_api_result(self, arg0: Any) -> bool:
+ """
+ __check_api_result Check API calls return content
+
+ Parameters
+ ----------
+ arg0 : any
+ Element to test
+
+ Returns
+ -------
+ bool
+ True if data are correct False in other cases
+ """
+ logger.debug(arg0)
+ return len(arg0) > 0
+
+ def _does_image_exist(self, image_name: str) -> bool:
+ """
+ _does_image_exist Check if an image is referenced in Cloudvision facts
+
+ Parameters
+ ----------
+ image_name : str
+ Name of the image to search for
+
+ Returns
+ -------
+ bool
+ True if present
+ """
+ return any(image_name == image['name'] for image in self._cv_images) if isinstance(self._cv_images, list) else False
+
+ def _does_bundle_exist(self, bundle_name: str) -> bool:
+ # pylint: disable=unused-argument
+ """
+ _does_bundle_exist Check if an image is referenced in Cloudvision facts
+
+ Returns
+ -------
+ bool
+ True if present
+ """
+ # return any(bundle_name == bundle['name'] for bundle in self._cv_bundles)
+ return False
+
+ def upload_image(self, image_path: str) -> bool:
+ """
+ upload_image Upload an image to Cloudvision server
+
+ Parameters
+ ----------
+ image_path : str
+ Path to the local file to upload
+
+ Returns
+ -------
+ bool
+ True if succeeds
+ """
+ image_item = Filer(path=image_path)
+ if image_item.file_exist is False:
+ logger.error(f'File not found: {image_item.relative_path}')
+ return False
+ logger.info(f'File path for image: {image_item}')
+ if self._does_image_exist(image_name=image_item.filename):
+ logger.error("Image found in Cloudvision , Please delete it before running this script")
+ return False
+ try:
+ upload_result = self._cv_instance.api.add_image(filepath=image_item.absolute_path)
+ except Exception as e: # pylint: disable=broad-exception-caught
+ logger.error('An error occurred during upload, check CV connection')
+ logger.error(f'Exception message is: {e}')
+ return False
+ logger.debug(f'Upload Result is : {upload_result}')
+ return True
+
+ def build_image_list(self, image_list: List[str]) -> List[Any]:
+ """
+ Builds a list of the image data structures, for a given list of image names.
+ Parameters
+ ----------
+ image_list : list
+ List of software image names
+ Returns
+ -------
+ List:
+ Returns a list of images, with complete data or None in the event of failure
+ """
+ internal_image_list = []
+ image_data = None
+ success = True
+
+ for entry in image_list:
+ for image in self._cv_images:
+ if image["imageFileName"] == entry:
+ image_data = image
+
+ if image_data is not None:
+ internal_image_list.append(image_data)
+ image_data = None
+ else:
+ success = False
+
+ return internal_image_list if success else []
+
+ def create_bundle(self, name: str, images_name: List[str]) -> bool:
+ """
+ create_bundle Create a bundle with a list of images.
+
+ Parameters
+ ----------
+ name : str
+ Name of the bundle
+ images_name : List[str]
+ List of images available on Cloudvision
+
+ Returns
+ -------
+ bool
+ True if succeeds
+ """
+ logger.debug(f'Init creation of an image bundle {name} with following images {images_name}')
+ all_images_present: List[bool] = []
+ self._cv_images = self.__get_images()
+ all_images_present.extend(
+ self._does_image_exist(image_name=image_name)
+ for image_name in images_name
+ )
+ # Bundle Create
+ if self._does_bundle_exist(bundle_name=name) is False:
+ logger.debug(f'Creating image bundle {name} with following images {images_name}')
+ images_data = self.build_image_list(image_list=images_name)
+ if images_data is not None:
+ logger.debug('Images information: {images_data}')
+ try:
+ data = self._cv_instance.api.save_image_bundle(name=name, images=images_data)
+ except Exception as e: # pylint: disable=broad-exception-caught
+ logger.critical(f'{e}')
+ else:
+ logger.debug(data)
+ return True
+ logger.critical('No data found for images')
+ return False
diff --git a/eos_downloader/data.py b/eos_downloader/data.py
new file mode 100644
index 0000000..74f2f8e
--- /dev/null
+++ b/eos_downloader/data.py
@@ -0,0 +1,93 @@
+#!/usr/bin/python
+# coding: utf-8 -*-
+
+"""
+EOS Downloader Information to use in
+eos_downloader.object_downloader.ObjectDownloader._build_filename.
+
+Data are built from content of Arista XML file
+"""
+
+
+# [platform][image][version]
+DATA_MAPPING = {
+ "CloudVision": {
+ "ova": {
+ "extension": ".ova",
+ "prepend": "cvp",
+ "folder_level": 0
+ },
+ "rpm": {
+ "extension": "",
+ "prepend": "cvp-rpm-installer",
+ "folder_level": 0
+ },
+ "kvm": {
+ "extension": "-kvm.tgz",
+ "prepend": "cvp",
+ "folder_level": 0
+ },
+ "upgrade": {
+ "extension": ".tgz",
+ "prepend": "cvp-upgrade",
+ "folder_level": 0
+ },
+ },
+ "EOS": {
+ "64": {
+ "extension": ".swi",
+ "prepend": "EOS64",
+ "folder_level": 0
+ },
+ "INT": {
+ "extension": "-INT.swi",
+ "prepend": "EOS",
+ "folder_level": 1
+ },
+ "2GB-INT": {
+ "extension": "-INT.swi",
+ "prepend": "EOS-2GB",
+ "folder_level": 1
+ },
+ "cEOS": {
+ "extension": ".tar.xz",
+ "prepend": "cEOS-lab",
+ "folder_level": 0
+ },
+ "cEOS64": {
+ "extension": ".tar.xz",
+ "prepend": "cEOS64-lab",
+ "folder_level": 0
+ },
+ "vEOS": {
+ "extension": ".vmdk",
+ "prepend": "vEOS",
+ "folder_level": 0
+ },
+ "vEOS-lab": {
+ "extension": ".vmdk",
+ "prepend": "vEOS-lab",
+ "folder_level": 0
+ },
+ "EOS-2GB": {
+ "extension": ".swi",
+ "prepend": "EOS-2GB",
+ "folder_level": 0
+ },
+ "RN": {
+ "extension": "-",
+ "prepend": "RN",
+ "folder_level": 0
+ },
+ "SOURCE": {
+ "extension": "-source.tar",
+ "prepend": "EOS",
+ "folder_level": 0
+ },
+ "default": {
+ "extension": ".swi",
+ "prepend": "EOS",
+ "folder_level": 0
+ }
+ }
+}
diff --git a/eos_downloader/download.py b/eos_downloader/download.py
new file mode 100644
index 0000000..2297b04
--- /dev/null
+++ b/eos_downloader/download.py
@@ -0,0 +1,77 @@
+# flake8: noqa: F811
+# pylint: disable=unused-argument
+# pylint: disable=too-few-public-methods
+
+"""download module"""
+
+import os.path
+import signal
+from concurrent.futures import ThreadPoolExecutor
+from threading import Event
+from typing import Iterable, Any
+
+import requests
+import rich
+from rich import console
+from rich.progress import (BarColumn, DownloadColumn, Progress, TaskID,
+ TextColumn, TimeElapsedColumn, TransferSpeedColumn)
+
+console = rich.get_console()
+done_event = Event()
+
+
+def handle_sigint(signum: Any, frame: Any) -> None:
+ """Progress bar handler"""
+ done_event.set()
+
+
+signal.signal(signal.SIGINT, handle_sigint)
+
+
+class DownloadProgressBar():
+ """
+ Object to manage Download process with Progress Bar from Rich
+ """
+
+ def __init__(self) -> None:
+ """
+ Class Constructor
+ """
+ self.progress = Progress(
+ TextColumn("💾 Downloading [bold blue]{task.fields[filename]}", justify="right"),
+ BarColumn(bar_width=None),
+ "[progress.percentage]{task.percentage:>3.1f}%",
+ "•",
+ TransferSpeedColumn(),
+ "•",
+ DownloadColumn(),
+ "•",
+ TimeElapsedColumn(),
+ "•",
+ console=console
+ )
+
+ def _copy_url(self, task_id: TaskID, url: str, path: str, block_size: int = 1024) -> bool:
+ """Copy data from a url to a local file."""
+ response = requests.get(url, stream=True, timeout=5)
+ # This will break if the response doesn't contain content length
+ self.progress.update(task_id, total=int(response.headers['Content-Length']))
+ with open(path, "wb") as dest_file:
+ self.progress.start_task(task_id)
+ for data in response.iter_content(chunk_size=block_size):
+ dest_file.write(data)
+ self.progress.update(task_id, advance=len(data))
+ if done_event.is_set():
+ return True
+ # console.print(f"Downloaded {path}")
+ return False
+
+ def download(self, urls: Iterable[str], dest_dir: str) -> None:
+ """Download multuple files to the given directory."""
+ with self.progress:
+ with ThreadPoolExecutor(max_workers=4) as pool:
+ for url in urls:
+ filename = url.split("/")[-1].split('?')[0]
+ dest_path = os.path.join(dest_dir, filename)
+ task_id = self.progress.add_task("download", filename=filename, start=False)
+ pool.submit(self._copy_url, task_id, url, dest_path)
diff --git a/eos_downloader/eos.py b/eos_downloader/eos.py
new file mode 100644
index 0000000..e5f3670
--- /dev/null
+++ b/eos_downloader/eos.py
@@ -0,0 +1,177 @@
+#!/usr/bin/python
+# coding: utf-8 -*-
+# flake8: noqa: F811
+
+"""
+Specific EOS inheritance from object_download
+"""
+
+import os
+import xml.etree.ElementTree as ET
+from typing import List, Union
+
+import rich
+from loguru import logger
+from rich import console
+
+from eos_downloader.models.version import BASE_BRANCH_STR, BASE_VERSION_STR, REGEX_EOS_VERSION, RTYPE_FEATURE, EosVersion
+from eos_downloader.object_downloader import ObjectDownloader
+
+# logger = logging.getLogger(__name__)
+
+console = rich.get_console()
+
+class EOSDownloader(ObjectDownloader):
+ """
+ EOSDownloader Object to download EOS images from Arista.com website
+
+ Supercharge ObjectDownloader to support EOS specific actions
+
+ Parameters
+ ----------
+ ObjectDownloader : ObjectDownloader
+ Base object
+ """
+
+ eos_versions: Union[List[EosVersion], None] = None
+
+ @staticmethod
+ def _disable_ztp(file_path: str) -> None:
+ """
+ _disable_ztp Method to disable ZTP in EOS image
+
+ Create a file in the EOS image to disable ZTP process during initial boot
+
+ Parameters
+ ----------
+ file_path : str
+ Path where EOS image is located
+ """
+ logger.info('Mounting volume to disable ZTP')
+ console.print('🚀 Mounting volume to disable ZTP')
+ raw_folder = os.path.join(file_path, "raw")
+ os.system(f"rm -rf {raw_folder}")
+ os.system(f"mkdir -p {raw_folder}")
+ os.system(
+ f'guestmount -a {os.path.join(file_path, "hda.qcow2")} -m /dev/sda2 {os.path.join(file_path, "raw")}')
+ ztp_file = os.path.join(file_path, 'raw/zerotouch-config')
+ with open(ztp_file, 'w', encoding='ascii') as zfile:
+ zfile.write('DISABLE=True')
+ logger.info(f'Unmounting volume in {file_path}')
+ os.system(f"guestunmount {os.path.join(file_path, 'raw')}")
+ os.system(f"rm -rf {os.path.join(file_path, 'raw')}")
+ logger.info(f"Volume has been successfully unmounted at {file_path}")
+
+ def _parse_xml_for_version(self,root_xml: ET.ElementTree, xpath: str = './/dir[@label="Active Releases"]/dir/dir/[@label]') -> List[EosVersion]:
+ """
+ Extract list of available EOS versions from Arista.com website
+
+ Create a list of EosVersion object for all versions available on Arista.com
+
+ Args:
+ root_xml (ET.ElementTree): XML file with all versions available
+ xpath (str, optional): XPATH to use to extract EOS version. Defaults to './/dir[@label="Active Releases"]/dir/dir/[@label]'.
+
+ Returns:
+ List[EosVersion]: List of EosVersion representing all available EOS versions
+ """
+ # XPATH: .//dir[@label="Active Releases"]/dir/dir/[@label]
+ if self.eos_versions is None:
+ logger.debug(f'Using xpath {xpath}')
+ eos_versions = []
+ for node in root_xml.findall(xpath):
+ if 'label' in node.attrib and node.get('label') is not None:
+ label = node.get('label')
+ if label is not None and REGEX_EOS_VERSION.match(label):
+ eos_version = EosVersion.from_str(label)
+ eos_versions.append(eos_version)
+ logger.debug(f"Found {label} - {eos_version}")
+ logger.debug(f'List of versions found on arista.com is: {eos_versions}')
+ self.eos_versions = eos_versions
+ else:
+ logger.debug('receiving instruction to download versions, but already available')
+ return self.eos_versions
+
+ def _get_branches(self, with_rtype: str = RTYPE_FEATURE) -> List[str]:
+ """
+ Extract all EOS branches available from arista.com
+
+ Call self._parse_xml_for_version and then build list of available branches
+
+ Args:
+ rtype (str, optional): Release type to find. Can be M or F, default to F
+
+ Returns:
+ List[str]: A lsit of string that represent all availables EOS branches
+ """
+ root = self._get_folder_tree()
+ versions = self._parse_xml_for_version(root_xml=root)
+ return list({version.branch for version in versions if version.rtype == with_rtype})
+
+ def latest_branch(self, rtype: str = RTYPE_FEATURE) -> EosVersion:
+ """
+ Get latest branch from semver standpoint
+
+ Args:
+ rtype (str, optional): Release type to find. Can be M or F, default to F
+
+ Returns:
+ EosVersion: Latest Branch object
+ """
+ selected_branch = EosVersion.from_str(BASE_BRANCH_STR)
+ for branch in self._get_branches(with_rtype=rtype):
+ branch = EosVersion.from_str(branch)
+ if branch > selected_branch:
+ selected_branch = branch
+ return selected_branch
+
+ def get_eos_versions(self, branch: Union[str,None] = None, rtype: Union[str,None] = None) -> List[EosVersion]:
+ """
+ Get a list of available EOS version available on arista.com
+
+ If a branch is provided, only version in this branch are listed.
+ Otherwise, all versions are provided.
+
+ Args:
+ branch (str, optional): An EOS branch to filter. Defaults to None.
+ rtype (str, optional): Release type to find. Can be M or F, default to F
+
+ Returns:
+ List[EosVersion]: A list of versions available
+ """
+ root = self._get_folder_tree()
+ result = []
+ for version in self._parse_xml_for_version(root_xml=root):
+ if branch is None and (version.rtype == rtype or rtype is None):
+ result.append(version)
+ elif branch is not None and version.is_in_branch(branch) and version.rtype == rtype:
+ result.append(version)
+ return result
+
+ def latest_eos(self, branch: Union[str,None] = None, rtype: str = RTYPE_FEATURE) -> EosVersion:
+ """
+ Get latest version of EOS
+
+ If a branch is provided, only version in this branch are listed.
+ Otherwise, all versions are provided.
+ You can select what type of version to consider: M or F
+
+ Args:
+ branch (str, optional): An EOS branch to filter. Defaults to None.
+ rtype (str, optional): An EOS version type to filter, Can be M or F. Defaults to None.
+
+ Returns:
+ EosVersion: latest version selected
+ """
+ selected_version = EosVersion.from_str(BASE_VERSION_STR)
+ if branch is None:
+ latest_branch = self.latest_branch(rtype=rtype)
+ else:
+ latest_branch = EosVersion.from_str(branch)
+ for version in self.get_eos_versions(branch=str(latest_branch.branch), rtype=rtype):
+ if version > selected_version:
+ if rtype is not None and version.rtype == rtype:
+ selected_version = version
+ if rtype is None:
+ selected_version = version
+ return selected_version
diff --git a/eos_downloader/models/__init__.py b/eos_downloader/models/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/eos_downloader/models/__init__.py
diff --git a/eos_downloader/models/version.py b/eos_downloader/models/version.py
new file mode 100644
index 0000000..14448c1
--- /dev/null
+++ b/eos_downloader/models/version.py
@@ -0,0 +1,272 @@
+#!/usr/bin/python
+# coding: utf-8 -*-
+
+"""Module for EOS version management"""
+
+from __future__ import annotations
+
+import re
+import typing
+from typing import Any, Optional
+
+from loguru import logger
+from pydantic import BaseModel
+
+from eos_downloader.tools import exc_to_str
+
+# logger = logging.getLogger(__name__)
+
+BASE_VERSION_STR = '4.0.0F'
+BASE_BRANCH_STR = '4.0'
+
+RTYPE_FEATURE = 'F'
+RTYPE_MAINTENANCE = 'M'
+RTYPES = [RTYPE_FEATURE, RTYPE_MAINTENANCE]
+
+# Regular Expression to capture multiple EOS version format
+# 4.24
+# 4.23.0
+# 4.21.1M
+# 4.28.10.F
+# 4.28.6.1M
+REGEX_EOS_VERSION = re.compile(r"^.*(?P<major>4)\.(?P<minor>\d{1,2})\.(?P<patch>\d{1,2})(?P<other>\.\d*)*(?P<rtype>[M,F])*$")
+REGEX_EOS_BRANCH = re.compile(r"^.*(?P<major>4)\.(?P<minor>\d{1,2})(\.?P<patch>\d)*(\.\d)*(?P<rtype>[M,F])*$")
+
+
+class EosVersion(BaseModel):
+ """
+ EosVersion object to play with version management in code
+
+ Since EOS is not using strictly semver approach, this class mimic some functions from semver lib for Arista EOS versions
+ It is based on Pydantic and provides helpers for comparison:
+
+ Examples:
+ >>> eos_version_str = '4.23.2F'
+ >>> eos_version = EosVersion.from_str(eos_version_str)
+ >>> print(f'str representation is: {str(eos_version)}')
+ str representation is: 4.23.2F
+
+ >>> other_version = EosVersion.from_str(other_version_str)
+ >>> print(f'eos_version < other_version: {eos_version < other_version}')
+ eos_version < other_version: True
+
+ >>> print(f'Is eos_version match("<=4.23.3M"): {eos_version.match("<=4.23.3M")}')
+ Is eos_version match("<=4.23.3M"): True
+
+ >>> print(f'Is eos_version in branch 4.23: {eos_version.is_in_branch("4.23.0")}')
+ Is eos_version in branch 4.23: True
+
+ Args:
+ BaseModel (Pydantic): Pydantic Base Model
+ """
+ major: int = 4
+ minor: int = 0
+ patch: int = 0
+ rtype: Optional[str] = 'F'
+ other: Any
+
+ @classmethod
+ def from_str(cls, eos_version: str) -> EosVersion:
+ """
+ Class constructor from a string representing EOS version
+
+ Use regular expresion to extract fields from string.
+ It supports following formats:
+ - 4.24
+ - 4.23.0
+ - 4.21.1M
+ - 4.28.10.F
+ - 4.28.6.1M
+
+ Args:
+ eos_version (str): EOS version in str format
+
+ Returns:
+ EosVersion object
+ """
+ logger.debug(f'receiving version: {eos_version}')
+ if REGEX_EOS_VERSION.match(eos_version):
+ matches = REGEX_EOS_VERSION.match(eos_version)
+ # assert matches is not None
+ assert matches is not None
+ return cls(**matches.groupdict())
+ if REGEX_EOS_BRANCH.match(eos_version):
+ matches = REGEX_EOS_BRANCH.match(eos_version)
+ # assert matches is not None
+ assert matches is not None
+ return cls(**matches.groupdict())
+ logger.error(f'Error occured with {eos_version}')
+ return EosVersion()
+
+ @property
+ def branch(self) -> str:
+ """
+ Extract branch of version
+
+ Returns:
+ str: branch from version
+ """
+ return f'{self.major}.{self.minor}'
+
+ def __str__(self) -> str:
+ """
+ Standard str representation
+
+ Return string for EOS version like 4.23.3M
+
+ Returns:
+ str: A standard EOS version string representing <MAJOR>.<MINOR>.<PATCH><RTYPE>
+ """
+ if self.other is None:
+ return f'{self.major}.{self.minor}.{self.patch}{self.rtype}'
+ return f'{self.major}.{self.minor}.{self.patch}{self.other}{self.rtype}'
+
+ def _compare(self, other: EosVersion) -> float:
+ """
+ An internal comparison function to compare 2 EosVersion objects
+
+ Do a deep comparison from Major to Release Type
+ The return value is
+ - negative if ver1 < ver2,
+ - zero if ver1 == ver2
+ - strictly positive if ver1 > ver2
+
+ Args:
+ other (EosVersion): An EosVersion to compare with this object
+
+ Raises:
+ ValueError: Raise ValueError if input is incorrect type
+
+ Returns:
+ float: -1 if ver1 < ver2, 0 if ver1 == ver2, 1 if ver1 > ver2
+ """
+ if not isinstance(other, EosVersion):
+ raise ValueError(f'could not compare {other} as it is not an EosVersion object')
+ comparison_flag: float = 0
+ logger.warning(f'current version {self.__str__()} - other {str(other)}') # pylint: disable = unnecessary-dunder-call
+ for key, _ in self.dict().items():
+ if comparison_flag == 0 and self.dict()[key] is None or other.dict()[key] is None:
+ logger.debug(f'{key}: local None - remote None')
+ logger.debug(f'{key}: local {self.dict()} - remote {other.dict()}')
+ return comparison_flag
+ logger.debug(f'{key}: local {self.dict()[key]} - remote {other.dict()[key]}')
+ if comparison_flag == 0 and self.dict()[key] < other.dict()[key]:
+ comparison_flag = -1
+ if comparison_flag == 0 and self.dict()[key] > other.dict()[key]:
+ comparison_flag = 1
+ if comparison_flag != 0:
+ logger.info(f'comparison result is {comparison_flag}')
+ return comparison_flag
+ logger.info(f'comparison result is {comparison_flag}')
+ return comparison_flag
+
+ @typing.no_type_check
+ def __eq__(self, other):
+ """ Implement __eq__ function (==) """
+ return self._compare(other) == 0
+
+ @typing.no_type_check
+ def __ne__(self, other):
+ # type: ignore
+ """ Implement __nw__ function (!=) """
+ return self._compare(other) != 0
+
+ @typing.no_type_check
+ def __lt__(self, other):
+ # type: ignore
+ """ Implement __lt__ function (<) """
+ return self._compare(other) < 0
+
+ @typing.no_type_check
+ def __le__(self, other):
+ # type: ignore
+ """ Implement __le__ function (<=) """
+ return self._compare(other) <= 0
+
+ @typing.no_type_check
+ def __gt__(self, other):
+ # type: ignore
+ """ Implement __gt__ function (>) """
+ return self._compare(other) > 0
+
+ @typing.no_type_check
+ def __ge__(self, other):
+ # type: ignore
+ """ Implement __ge__ function (>=) """
+ return self._compare(other) >= 0
+
+ def match(self, match_expr: str) -> bool:
+ """
+ Compare self to match a match expression.
+
+ Example:
+ >>> eos_version.match("<=4.23.3M")
+ True
+ >>> eos_version.match("==4.23.3M")
+ False
+
+ Args:
+ match_expr (str): optional operator and version; valid operators are
+ ``<`` smaller than
+ ``>`` greater than
+ ``>=`` greator or equal than
+ ``<=`` smaller or equal than
+ ``==`` equal
+ ``!=`` not equal
+
+ Raises:
+ ValueError: If input has no match_expr nor match_ver
+
+ Returns:
+ bool: True if the expression matches the version, otherwise False
+ """
+ prefix = match_expr[:2]
+ if prefix in (">=", "<=", "==", "!="):
+ match_version = match_expr[2:]
+ elif prefix and prefix[0] in (">", "<"):
+ prefix = prefix[0]
+ match_version = match_expr[1:]
+ elif match_expr and match_expr[0] in "0123456789":
+ prefix = "=="
+ match_version = match_expr
+ else:
+ raise ValueError(
+ "match_expr parameter should be in format <op><ver>, "
+ "where <op> is one of "
+ "['<', '>', '==', '<=', '>=', '!=']. "
+ f"You provided: {match_expr}"
+ )
+ logger.debug(f'work on comparison {prefix} with base release {match_version}')
+ possibilities_dict = {
+ ">": (1,),
+ "<": (-1,),
+ "==": (0,),
+ "!=": (-1, 1),
+ ">=": (0, 1),
+ "<=": (-1, 0),
+ }
+ possibilities = possibilities_dict[prefix]
+ cmp_res = self._compare(EosVersion.from_str(match_version))
+
+ return cmp_res in possibilities
+
+ def is_in_branch(self, branch_str: str) -> bool:
+ """
+ Check if current version is part of a branch version
+
+ Comparison is done across MAJOR and MINOR
+
+ Args:
+ branch_str (str): a string for EOS branch. It supports following formats 4.23 or 4.23.0
+
+ Returns:
+ bool: True if current version is in provided branch, otherwise False
+ """
+ try:
+ logger.debug(f'reading branch str:{branch_str}')
+ branch = EosVersion.from_str(branch_str)
+ except Exception as error: # pylint: disable = broad-exception-caught
+ logger.error(exc_to_str(error))
+ else:
+ return self.major == branch.major and self.minor == branch.minor
+ return False
diff --git a/eos_downloader/object_downloader.py b/eos_downloader/object_downloader.py
new file mode 100644
index 0000000..0420acb
--- /dev/null
+++ b/eos_downloader/object_downloader.py
@@ -0,0 +1,513 @@
+#!/usr/bin/python
+# coding: utf-8 -*-
+# flake8: noqa: F811
+# pylint: disable=too-many-instance-attributes
+# pylint: disable=too-many-arguments
+
+"""
+eos_downloader class definition
+"""
+
+from __future__ import (absolute_import, division, print_function,
+ unicode_literals, annotations)
+
+import base64
+import glob
+import hashlib
+import json
+import os
+import sys
+import xml.etree.ElementTree as ET
+from typing import Union
+
+import requests
+import rich
+from loguru import logger
+from rich import console
+from tqdm import tqdm
+
+from eos_downloader import (ARISTA_DOWNLOAD_URL, ARISTA_GET_SESSION,
+ ARISTA_SOFTWARE_FOLDER_TREE, EVE_QEMU_FOLDER_PATH,
+ MSG_INVALID_DATA, MSG_TOKEN_EXPIRED)
+from eos_downloader.data import DATA_MAPPING
+from eos_downloader.download import DownloadProgressBar
+
+# logger = logging.getLogger(__name__)
+
+console = rich.get_console()
+
+
+class ObjectDownloader():
+ """
+ ObjectDownloader Generic Object to download from Arista.com
+ """
+ def __init__(self, image: str, version: str, token: str, software: str = 'EOS', hash_method: str = 'md5sum'):
+ """
+ __init__ Class constructor
+
+ generic class constructor
+
+ Parameters
+ ----------
+ image : str
+ Type of image to download
+ version : str
+ Version of the package to download
+ token : str
+ Arista API token
+ software : str, optional
+ Package name to download (vEOS-lab, cEOS, EOS, ...), by default 'EOS'
+ hash_method : str, optional
+ Hash protocol to use to check download, by default 'md5sum'
+ """
+ self.software = software
+ self.image = image
+ self._version = version
+ self.token = token
+ self.folder_level = 0
+ self.session_id = None
+ self.filename = self._build_filename()
+ self.hash_method = hash_method
+ self.timeout = 5
+ # Logging
+ logger.debug(f'Filename built by _build_filename is {self.filename}')
+
+ def __str__(self) -> str:
+ return f'{self.software} - {self.image} - {self.version}'
+
+ # def __repr__(self):
+ # return str(self.__dict__)
+
+ @property
+ def version(self) -> str:
+ """Get version."""
+ return self._version
+
+ @version.setter
+ def version(self, value: str) -> None:
+ """Set version."""
+ self._version = value
+ self.filename = self._build_filename()
+
+ # ------------------------------------------------------------------------ #
+ # Internal METHODS
+ # ------------------------------------------------------------------------ #
+
+ def _build_filename(self) -> str:
+ """
+ _build_filename Helper to build filename to search on arista.com
+
+ Returns
+ -------
+ str:
+ Filename to search for on Arista.com
+ """
+ logger.info('start build')
+ if self.software in DATA_MAPPING:
+ logger.info(f'software in data mapping: {self.software}')
+ if self.image in DATA_MAPPING[self.software]:
+ logger.info(f'image in data mapping: {self.image}')
+ return f"{DATA_MAPPING[self.software][self.image]['prepend']}-{self.version}{DATA_MAPPING[self.software][self.image]['extension']}"
+ return f"{DATA_MAPPING[self.software]['default']['prepend']}-{self.version}{DATA_MAPPING[self.software]['default']['extension']}"
+ raise ValueError(f'Incorrect value for software {self.software}')
+
+ def _parse_xml_for_path(self, root_xml: ET.ElementTree, xpath: str, search_file: str) -> str:
+ # sourcery skip: remove-unnecessary-cast
+ """
+ _parse_xml Read and extract data from XML using XPATH
+
+ Get all interested nodes using XPATH and then get node that match search_file
+
+ Parameters
+ ----------
+ root_xml : ET.ElementTree
+ XML document
+ xpath : str
+ XPATH expression to filter XML
+ search_file : str
+ Filename to search for
+
+ Returns
+ -------
+ str
+ File Path on Arista server side
+ """
+ logger.debug(f'Using xpath {xpath}')
+ logger.debug(f'Search for file {search_file}')
+ console.print(f'🔎 Searching file {search_file}')
+ for node in root_xml.findall(xpath):
+ # logger.debug('Found {}', node.text)
+ if str(node.text).lower() == search_file.lower():
+ path = node.get('path')
+ console.print(f' -> Found file at {path}')
+ logger.info(f'Found {node.text} at {node.get("path")}')
+ return str(node.get('path')) if node.get('path') is not None else ''
+ logger.error(f'Requested file ({self.filename}) not found !')
+ return ''
+
+ def _get_hash(self, file_path: str) -> str:
+ """
+ _get_hash Download HASH file from Arista server
+
+ Parameters
+ ----------
+ file_path : str
+ Path of the HASH file
+
+ Returns
+ -------
+ str
+ Hash string read from HASH file downloaded from Arista.com
+ """
+ remote_hash_file = self._get_remote_hashpath(hash_method=self.hash_method)
+ hash_url = self._get_url(remote_file_path=remote_hash_file)
+ # hash_downloaded = self._download_file_raw(url=hash_url, file_path=file_path + "/" + os.path.basename(remote_hash_file))
+ dl_rich_progress_bar = DownloadProgressBar()
+ dl_rich_progress_bar.download(urls=[hash_url], dest_dir=file_path)
+ hash_downloaded = f"{file_path}/{os.path.basename(remote_hash_file)}"
+ hash_content = 'unset'
+ with open(hash_downloaded, 'r', encoding='utf-8') as f:
+ hash_content = f.read()
+ return hash_content.split(' ')[0]
+
+ @staticmethod
+ def _compute_hash_md5sum(file: str, hash_expected: str) -> bool:
+ """
+ _compute_hash_md5sum Compare MD5 sum
+
+ Do comparison between local md5 of the file and value provided by arista.com
+
+ Parameters
+ ----------
+ file : str
+ Local file to use for MD5 sum
+ hash_expected : str
+ MD5 from arista.com
+
+ Returns
+ -------
+ bool
+ True if both are equal, False if not
+ """
+ hash_md5 = hashlib.md5()
+ with open(file, "rb") as f:
+ for chunk in iter(lambda: f.read(4096), b""):
+ hash_md5.update(chunk)
+ if hash_md5.hexdigest() == hash_expected:
+ return True
+ logger.warning(f'Downloaded file is corrupt: local md5 ({hash_md5.hexdigest()}) is different to md5 from arista ({hash_expected})')
+ return False
+
+ @staticmethod
+ def _compute_hash_sh512sum(file: str, hash_expected: str) -> bool:
+ """
+ _compute_hash_sh512sum Compare SHA512 sum
+
+ Do comparison between local sha512 of the file and value provided by arista.com
+
+ Parameters
+ ----------
+ file : str
+ Local file to use for MD5 sum
+ hash_expected : str
+ SHA512 from arista.com
+
+ Returns
+ -------
+ bool
+ True if both are equal, False if not
+ """
+ hash_sha512 = hashlib.sha512()
+ with open(file, "rb") as f:
+ for chunk in iter(lambda: f.read(4096), b""):
+ hash_sha512.update(chunk)
+ if hash_sha512.hexdigest() == hash_expected:
+ return True
+ logger.warning(f'Downloaded file is corrupt: local sha512 ({hash_sha512.hexdigest()}) is different to sha512 from arista ({hash_expected})')
+ return False
+
+ def _get_folder_tree(self) -> ET.ElementTree:
+ """
+ _get_folder_tree Download XML tree from Arista server
+
+ Returns
+ -------
+ ET.ElementTree
+ XML document
+ """
+ if self.session_id is None:
+ self.authenticate()
+ jsonpost = {'sessionCode': self.session_id}
+ result = requests.post(ARISTA_SOFTWARE_FOLDER_TREE, data=json.dumps(jsonpost), timeout=self.timeout)
+ try:
+ folder_tree = result.json()["data"]["xml"]
+ return ET.ElementTree(ET.fromstring(folder_tree))
+ except KeyError as error:
+ logger.error(MSG_INVALID_DATA)
+ logger.error(f'Server returned: {error}')
+ console.print(f'❌ {MSG_INVALID_DATA}', style="bold red")
+ sys.exit(1)
+
+ def _get_remote_filepath(self) -> str:
+ """
+ _get_remote_filepath Helper to get path of the file to download
+
+ Set XPATH and return result of _parse_xml for the file to download
+
+ Returns
+ -------
+ str
+ Remote path of the file to download
+ """
+ root = self._get_folder_tree()
+ logger.debug("GET XML content from ARISTA.com")
+ xpath = f'.//dir[@label="{self.software}"]//file'
+ return self._parse_xml_for_path(root_xml=root, xpath=xpath, search_file=self.filename)
+
+ def _get_remote_hashpath(self, hash_method: str = 'md5sum') -> str:
+ """
+ _get_remote_hashpath Helper to get path of the hash's file to download
+
+ Set XPATH and return result of _parse_xml for the file to download
+
+ Returns
+ -------
+ str
+ Remote path of the hash's file to download
+ """
+ root = self._get_folder_tree()
+ logger.debug("GET XML content from ARISTA.com")
+ xpath = f'.//dir[@label="{self.software}"]//file'
+ return self._parse_xml_for_path(
+ root_xml=root,
+ xpath=xpath,
+ search_file=f'{self.filename}.{hash_method}',
+ )
+
+ def _get_url(self, remote_file_path: str) -> str:
+ """
+ _get_url Get URL to use for downloading file from Arista server
+
+ Send remote_file_path to get correct URL to use for download
+
+ Parameters
+ ----------
+ remote_file_path : str
+ Filepath from XML to use to get correct download link
+
+ Returns
+ -------
+ str
+ URL link to use for download
+ """
+ if self.session_id is None:
+ self.authenticate()
+ jsonpost = {'sessionCode': self.session_id, 'filePath': remote_file_path}
+ result = requests.post(ARISTA_DOWNLOAD_URL, data=json.dumps(jsonpost), timeout=self.timeout)
+ if 'data' in result.json() and 'url' in result.json()['data']:
+ # logger.debug('URL to download file is: {}', result.json())
+ return result.json()["data"]["url"]
+ logger.critical(f'Server returns following message: {result.json()}')
+ return ''
+
+ @staticmethod
+ def _download_file_raw(url: str, file_path: str) -> str:
+ """
+ _download_file Helper to download file from Arista.com
+
+ [extended_summary]
+
+ Parameters
+ ----------
+ url : str
+ URL provided by server for remote_file_path
+ file_path : str
+ Location where to save local file
+
+ Returns
+ -------
+ str
+ File path
+ """
+ chunkSize = 1024
+ r = requests.get(url, stream=True, timeout=5)
+ with open(file_path, 'wb') as f:
+ pbar = tqdm(unit="B", total=int(r.headers['Content-Length']), unit_scale=True, unit_divisor=1024)
+ for chunk in r.iter_content(chunk_size=chunkSize):
+ if chunk:
+ pbar.update(len(chunk))
+ f.write(chunk)
+ return file_path
+
+ def _download_file(self, file_path: str, filename: str, rich_interface: bool = True) -> Union[None, str]:
+ remote_file_path = self._get_remote_filepath()
+ logger.info(f'File found on arista server: {remote_file_path}')
+ file_url = self._get_url(remote_file_path=remote_file_path)
+ if file_url is not False:
+ if not rich_interface:
+ return self._download_file_raw(url=file_url, file_path=os.path.join(file_path, filename))
+ rich_downloader = DownloadProgressBar()
+ rich_downloader.download(urls=[file_url], dest_dir=file_path)
+ return os.path.join(file_path, filename)
+ logger.error(f'Cannot download file {file_path}')
+ return None
+
+ @staticmethod
+ def _create_destination_folder(path: str) -> None:
+ # os.makedirs(path, mode, exist_ok=True)
+ os.system(f'mkdir -p {path}')
+
+ @staticmethod
+ def _disable_ztp(file_path: str) -> None:
+ pass
+
+ # ------------------------------------------------------------------------ #
+ # Public METHODS
+ # ------------------------------------------------------------------------ #
+
+ def authenticate(self) -> bool:
+ """
+ authenticate Authenticate user on Arista.com server
+
+ Send API token and get a session-id from remote server.
+ Session-id will be used by all other functions.
+
+ Returns
+ -------
+ bool
+ True if authentication succeeds=, False in all other situations.
+ """
+ credentials = (base64.b64encode(self.token.encode())).decode("utf-8")
+ session_code_url = ARISTA_GET_SESSION
+ jsonpost = {'accessToken': credentials}
+
+ result = requests.post(session_code_url, data=json.dumps(jsonpost), timeout=self.timeout)
+
+ if result.json()["status"]["message"] in[ 'Access token expired', 'Invalid access token']:
+ console.print(f'❌ {MSG_TOKEN_EXPIRED}', style="bold red")
+ logger.error(MSG_TOKEN_EXPIRED)
+ return False
+
+ try:
+ if 'data' in result.json():
+ self.session_id = result.json()["data"]["session_code"]
+ logger.info('Authenticated on arista.com')
+ return True
+ logger.debug(f'{result.json()}')
+ return False
+ except KeyError as error_arista:
+ logger.error(f'Error: {error_arista}')
+ sys.exit(1)
+
+ def download_local(self, file_path: str, checksum: bool = False) -> bool:
+ # sourcery skip: move-assign
+ """
+ download_local Entrypoint for local download feature
+
+ Do local downnload feature:
+ - Get remote file path
+ - Get URL from Arista.com
+ - Download file
+ - Do HASH comparison (optional)
+
+ Parameters
+ ----------
+ file_path : str
+ Local path to save downloaded file
+ checksum : bool, optional
+ Execute checksum or not, by default False
+
+ Returns
+ -------
+ bool
+ True if everything went well, False if any problem appears
+ """
+ file_downloaded = str(self._download_file(file_path=file_path, filename=self.filename))
+
+ # Check file HASH
+ hash_result = False
+ if checksum:
+ logger.info('🚀 Running checksum validation')
+ console.print('🚀 Running checksum validation')
+ if self.hash_method == 'md5sum':
+ hash_expected = self._get_hash(file_path=file_path)
+ hash_result = self._compute_hash_md5sum(file=file_downloaded, hash_expected=hash_expected)
+ elif self.hash_method == 'sha512sum':
+ hash_expected = self._get_hash(file_path=file_path)
+ hash_result = self._compute_hash_sh512sum(file=file_downloaded, hash_expected=hash_expected)
+ if not hash_result:
+ logger.error('Downloaded file is corrupted, please check your connection')
+ console.print('❌ Downloaded file is corrupted, please check your connection')
+ return False
+ logger.info('Downloaded file is correct.')
+ console.print('✅ Downloaded file is correct.')
+ return True
+
+ def provision_eve(self, noztp: bool = False, checksum: bool = True) -> None:
+ # pylint: disable=unused-argument
+ """
+ provision_eve Entrypoint for EVE-NG download and provisioning
+
+ Do following actions:
+ - Get remote file path
+ - Get URL from file path
+ - Download file
+ - Convert file to qcow2 format
+ - Create new version to EVE-NG
+ - Disable ZTP (optional)
+
+ Parameters
+ ----------
+ noztp : bool, optional
+ Flag to deactivate ZTP in EOS image, by default False
+ checksum : bool, optional
+ Flag to ask for hash validation, by default True
+ """
+ # Build image name to use in folder path
+ eos_image_name = self.filename.rstrip(".vmdk").lower()
+ if noztp:
+ eos_image_name = f'{eos_image_name}-noztp'
+ # Create full path for EVE-NG
+ file_path = os.path.join(EVE_QEMU_FOLDER_PATH, eos_image_name.rstrip())
+ # Create folders in filesystem
+ self._create_destination_folder(path=file_path)
+
+ # Download file to local destination
+ file_downloaded = self._download_file(
+ file_path=file_path, filename=self.filename)
+
+ # Convert to QCOW2 format
+ file_qcow2 = os.path.join(file_path, "hda.qcow2")
+ logger.info('Converting VMDK to QCOW2 format')
+ console.print('🚀 Converting VMDK to QCOW2 format...')
+
+ os.system(f'$(which qemu-img) convert -f vmdk -O qcow2 {file_downloaded} {file_qcow2}')
+
+ logger.info('Applying unl_wrapper to fix permissions')
+ console.print('Applying unl_wrapper to fix permissions')
+
+ os.system('/opt/unetlab/wrappers/unl_wrapper -a fixpermissions')
+ os.system(f'rm -f {file_downloaded}')
+
+ if noztp:
+ self._disable_ztp(file_path=file_path)
+
+ def docker_import(self, image_name: str = "arista/ceos") -> None:
+ """
+ Import docker container to your docker server.
+
+ Import downloaded container to your local docker engine.
+
+ Args:
+ version (str):
+ image_name (str, optional): Image name to use. Defaults to "arista/ceos".
+ """
+ docker_image = f'{image_name}:{self.version}'
+ logger.info(f'Importing image {self.filename} to {docker_image}')
+ console.print(f'🚀 Importing image {self.filename} to {docker_image}')
+ os.system(f'$(which docker) import {self.filename} {docker_image}')
+ for filename in glob.glob(f'{self.filename}*'):
+ try:
+ os.remove(filename)
+ except FileNotFoundError:
+ console.print(f'File not found: {filename}')
diff --git a/eos_downloader/tools.py b/eos_downloader/tools.py
new file mode 100644
index 0000000..a0f971a
--- /dev/null
+++ b/eos_downloader/tools.py
@@ -0,0 +1,13 @@
+#!/usr/bin/python
+# coding: utf-8 -*-
+
+"""Module for tools related to ardl"""
+
+
+def exc_to_str(exception: Exception) -> str:
+ """
+ Helper function to parse Exceptions
+ """
+ return (
+ f"{type(exception).__name__}{f' ({str(exception)})' if str(exception) else ''}"
+ )