diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-10-10 09:54:13 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-10-10 09:54:13 +0000 |
commit | 26cac87f1cb90abc2fac2144a39ad88b3b70031d (patch) | |
tree | f97c513314a569fb51f1c8580b012d418c05fede /eos_downloader | |
parent | Adding upstream version 0.8.2. (diff) | |
download | eos-downloader-upstream/0.9.0.tar.xz eos-downloader-upstream/0.9.0.zip |
Adding upstream version 0.9.0.upstream/0.9.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | eos_downloader/__init__.py | 27 | ||||
-rw-r--r-- | eos_downloader/cli/cli.py | 48 | ||||
-rw-r--r-- | eos_downloader/cli/debug/commands.py | 41 | ||||
-rw-r--r-- | eos_downloader/cli/get/commands.py | 211 | ||||
-rw-r--r-- | eos_downloader/cli/info/commands.py | 86 | ||||
-rw-r--r-- | eos_downloader/cli/utils.py | 38 | ||||
-rw-r--r-- | eos_downloader/cvp.py | 85 | ||||
-rw-r--r-- | eos_downloader/data.py | 92 | ||||
-rw-r--r-- | eos_downloader/download.py | 33 | ||||
-rw-r--r-- | eos_downloader/eos.py | 68 | ||||
-rw-r--r-- | eos_downloader/models/version.py | 71 | ||||
-rw-r--r-- | eos_downloader/object_downloader.py | 223 |
12 files changed, 658 insertions, 365 deletions
diff --git a/eos_downloader/__init__.py b/eos_downloader/__init__.py index 345ccf7..4507a70 100644 --- a/eos_downloader/__init__.py +++ b/eos_downloader/__init__.py @@ -5,23 +5,31 @@ EOS Downloader module. """ -from __future__ import (absolute_import, division, - print_function, unicode_literals, annotations) +from __future__ import ( + absolute_import, + annotations, + division, + print_function, + unicode_literals, +) + import dataclasses -from typing import Any -import json import importlib.metadata +import json +from typing import Any -__author__ = '@titom73' -__email__ = 'tom@inetsix.net' -__date__ = '2022-03-16' +__author__ = "@titom73" +__email__ = "tom@inetsix.net" +__date__ = "2022-03-16" __version__ = importlib.metadata.version("eos-downloader") # __all__ = ["CvpAuthenticationItem", "CvFeatureManager", "EOSDownloader", "ObjectDownloader", "reverse"] ARISTA_GET_SESSION = "https://www.arista.com/custom_data/api/cvp/getSessionCode/" -ARISTA_SOFTWARE_FOLDER_TREE = "https://www.arista.com/custom_data/api/cvp/getFolderTree/" +ARISTA_SOFTWARE_FOLDER_TREE = ( + "https://www.arista.com/custom_data/api/cvp/getFolderTree/" +) ARISTA_DOWNLOAD_URL = "https://www.arista.com/custom_data/api/cvp/getDownloadLink/" @@ -36,11 +44,12 @@ check the Access Token. Then re-run the script with the correct token. MSG_INVALID_DATA = """Invalid data returned by server """ -EVE_QEMU_FOLDER_PATH = '/opt/unetlab/addons/qemu/' +EVE_QEMU_FOLDER_PATH = "/opt/unetlab/addons/qemu/" class EnhancedJSONEncoder(json.JSONEncoder): """Custom JSon encoder.""" + def default(self, o: Any) -> Any: if dataclasses.is_dataclass(o): return dataclasses.asdict(o) diff --git a/eos_downloader/cli/cli.py b/eos_downloader/cli/cli.py index ddd0dea..ad77f2b 100644 --- a/eos_downloader/cli/cli.py +++ b/eos_downloader/cli/cli.py @@ -11,49 +11,51 @@ ARDL CLI Baseline. """ import click -from rich.console import Console -import eos_downloader -from eos_downloader.cli.get import commands as get_commands + +from eos_downloader import __version__ from eos_downloader.cli.debug import commands as debug_commands +from eos_downloader.cli.get import commands as get_commands from eos_downloader.cli.info import commands as info_commands +from eos_downloader.cli.utils import AliasedGroup + -@click.group() +@click.group(cls=AliasedGroup) +@click.version_option(__version__) @click.pass_context -@click.option('--token', show_envvar=True, default=None, help='Arista Token from your customer account') +@click.option( + "--token", + show_envvar=True, + default=None, + help="Arista Token from your customer account", +) def ardl(ctx: click.Context, token: str) -> None: """Arista Network Download CLI""" ctx.ensure_object(dict) - ctx.obj['token'] = token + ctx.obj["token"] = token -@click.command() -def version() -> None: - """Display version of ardl""" - console = Console() - console.print(f'ardl is running version {eos_downloader.__version__}') - - -@ardl.group(no_args_is_help=True) +@ardl.group(cls=AliasedGroup, no_args_is_help=True) @click.pass_context -def get(ctx: click.Context) -> None: +def get(ctx: click.Context, cls: click.Group = AliasedGroup) -> None: # pylint: disable=redefined-builtin """Download Arista from Arista website""" -@ardl.group(no_args_is_help=True) +@ardl.group(cls=AliasedGroup, no_args_is_help=True) @click.pass_context -def info(ctx: click.Context) -> None: +def info(ctx: click.Context, cls: click.Group = AliasedGroup) -> None: # pylint: disable=redefined-builtin """List information from Arista website""" -@ardl.group(no_args_is_help=True) +@ardl.group(cls=AliasedGroup, no_args_is_help=True) @click.pass_context -def debug(ctx: click.Context) -> None: +def debug(ctx: click.Context, cls: click.Group = AliasedGroup) -> None: # pylint: disable=redefined-builtin """Debug commands to work with ardl""" + # ANTA CLI Execution @@ -64,13 +66,9 @@ def cli() -> None: get.add_command(get_commands.cvp) info.add_command(info_commands.eos_versions) debug.add_command(debug_commands.xml) - ardl.add_command(version) # Load CLI - ardl( - obj={}, - auto_envvar_prefix='arista' - ) + ardl(obj={}, auto_envvar_prefix="arista") -if __name__ == '__main__': +if __name__ == "__main__": cli() diff --git a/eos_downloader/cli/debug/commands.py b/eos_downloader/cli/debug/commands.py index 107b8a0..5a0d7f8 100644 --- a/eos_downloader/cli/debug/commands.py +++ b/eos_downloader/cli/debug/commands.py @@ -22,32 +22,51 @@ import eos_downloader.eos @click.command() @click.pass_context -@click.option('--output', default=str('arista.xml'), help='Path to save XML file', type=click.Path(), show_default=True) -@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False)) +@click.option( + "--output", + default=str("arista.xml"), + help="Path to save XML file", + type=click.Path(), + show_default=True, +) +@click.option( + "--log-level", + "--log", + help="Logging level of the command", + default=None, + type=click.Choice( + ["debug", "info", "warning", "error", "critical"], case_sensitive=False + ), +) def xml(ctx: click.Context, output: str, log_level: str) -> None: # sourcery skip: remove-unnecessary-cast """Extract XML directory structure""" console = Console() # Get from Context - token = ctx.obj['token'] + token = ctx.obj["token"] logger.remove() if log_level is not None: logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper()) my_download = eos_downloader.eos.EOSDownloader( - image='unset', - software='EOS', - version='unset', + image="unset", + software="EOS", + version="unset", token=token, - hash_method='sha512sum') + hash_method="sha512sum", + ) my_download.authenticate() - xml_object: ET.ElementTree = my_download._get_folder_tree() # pylint: disable=protected-access + xml_object: ET.ElementTree = ( + my_download.get_folder_tree() + ) # pylint: disable=protected-access xml_content = xml_object.getroot() - xmlstr = minidom.parseString(ET.tostring(xml_content)).toprettyxml(indent=" ", newl='') - with open(output, "w", encoding='utf-8') as f: + xmlstr = minidom.parseString(ET.tostring(xml_content)).toprettyxml( + indent=" ", newl="" + ) + with open(output, "w", encoding="utf-8") as f: f.write(str(xmlstr)) - console.print(f'XML file saved in: { output }') + console.print(f"XML file saved in: { output }") diff --git a/eos_downloader/cli/get/commands.py b/eos_downloader/cli/get/commands.py index 13a8eec..b4525fe 100644 --- a/eos_downloader/cli/get/commands.py +++ b/eos_downloader/cli/get/commands.py @@ -21,68 +21,156 @@ from rich.console import Console import eos_downloader.eos from eos_downloader.models.version import BASE_VERSION_STR, RTYPE_FEATURE, RTYPES -EOS_IMAGE_TYPE = ['64', 'INT', '2GB-INT', 'cEOS', 'cEOS64', 'vEOS', 'vEOS-lab', 'EOS-2GB', 'default'] -CVP_IMAGE_TYPE = ['ova', 'rpm', 'kvm', 'upgrade'] +EOS_IMAGE_TYPE = [ + "64", + "INT", + "2GB-INT", + "cEOS", + "cEOS64", + "vEOS", + "vEOS-lab", + "EOS-2GB", + "default", +] +CVP_IMAGE_TYPE = ["ova", "rpm", "kvm", "upgrade"] + @click.command(no_args_is_help=True) @click.pass_context -@click.option('--image-type', default='default', help='EOS Image type', type=click.Choice(EOS_IMAGE_TYPE), required=True) -@click.option('--version', default=None, help='EOS version', type=str, required=False) -@click.option('--latest', '-l', is_flag=True, type=click.BOOL, default=False, help='Get latest version in given branch. If --branch is not use, get the latest branch with specific release type') -@click.option('--release-type', '-rtype', type=click.Choice(RTYPES, case_sensitive=False), default=RTYPE_FEATURE, help='EOS release type to search') -@click.option('--branch', '-b', type=click.STRING, default=None, help='EOS Branch to list releases') -@click.option('--docker-name', default='arista/ceos', help='Docker image name (default: arista/ceos)', type=str, show_default=True) -@click.option('--output', default=str(os.path.relpath(os.getcwd(), start=os.curdir)), help='Path to save image', type=click.Path(),show_default=True) +@click.option( + "--image-type", + default="default", + help="EOS Image type", + type=click.Choice(EOS_IMAGE_TYPE), + required=True, +) +@click.option("--version", default=None, help="EOS version", type=str, required=False) +@click.option( + "--latest", + "-l", + is_flag=True, + type=click.BOOL, + default=False, + help="Get latest version in given branch. If --branch is not use, get the latest branch with specific release type", +) +@click.option( + "--release-type", + "-rtype", + type=click.Choice(RTYPES, case_sensitive=False), + default=RTYPE_FEATURE, + help="EOS release type to search", +) +@click.option( + "--branch", + "-b", + type=click.STRING, + default=None, + help="EOS Branch to list releases", +) +@click.option( + "--docker-name", + default="arista/ceos", + help="Docker image name (default: arista/ceos)", + type=str, + show_default=True, +) +@click.option( + "--output", + default=str(os.path.relpath(os.getcwd(), start=os.curdir)), + help="Path to save image", + type=click.Path(), + show_default=True, +) # Debugging -@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False)) +@click.option( + "--log-level", + "--log", + help="Logging level of the command", + default=None, + type=click.Choice( + ["debug", "info", "warning", "error", "critical"], case_sensitive=False + ), +) # Boolean triggers -@click.option('--eve-ng', is_flag=True, help='Run EVE-NG vEOS provisioning (only if CLI runs on an EVE-NG server)', default=False) -@click.option('--disable-ztp', is_flag=True, help='Disable ZTP process in vEOS image (only available with --eve-ng)', default=False) -@click.option('--import-docker', is_flag=True, help='Import docker image (only available with --image_type cEOSlab)', default=False) +@click.option( + "--eve-ng", + is_flag=True, + help="Run EVE-NG vEOS provisioning (only if CLI runs on an EVE-NG server)", + default=False, +) +@click.option( + "--disable-ztp", + is_flag=True, + help="Disable ZTP process in vEOS image (only available with --eve-ng)", + default=False, +) +@click.option( + "--import-docker", + is_flag=True, + help="Import docker image (only available with --image_type cEOSlab)", + default=False, +) def eos( - ctx: click.Context, image_type: str, output: str, log_level: str, eve_ng: bool, disable_ztp: bool, - import_docker: bool, docker_name: str, version: Union[str, None] = None, release_type: str = RTYPE_FEATURE, - latest: bool = False, branch: Union[str,None] = None - ) -> int: + ctx: click.Context, + image_type: str, + output: str, + log_level: str, + eve_ng: bool, + disable_ztp: bool, + import_docker: bool, + docker_name: str, + version: Union[str, None] = None, + release_type: str = RTYPE_FEATURE, + latest: bool = False, + branch: Union[str, None] = None, +) -> int: """Download EOS image from Arista website""" console = Console() # Get from Context - token = ctx.obj['token'] - if token is None or token == '': - console.print('❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red") + token = ctx.obj["token"] + if token is None or token == "": + console.print( + "❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option", + style="bold red", + ) sys.exit(1) logger.remove() if log_level is not None: logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper()) - console.print("🪐 [bold blue]eos-downloader[/bold blue] is starting...", ) - console.print(f' - Image Type: {image_type}') - console.print(f' - Version: {version}') - + console.print( + "🪐 [bold blue]eos-downloader[/bold blue] is starting...", + ) + console.print(f" - Image Type: {image_type}") + console.print(f" - Version: {version}") if version is not None: my_download = eos_downloader.eos.EOSDownloader( image=image_type, - software='EOS', + software="EOS", version=version, token=token, - hash_method='sha512sum') + hash_method="sha512sum", + ) my_download.authenticate() elif latest: my_download = eos_downloader.eos.EOSDownloader( image=image_type, - software='EOS', - version='unset', + software="EOS", + version="unset", token=token, - hash_method='sha512sum') + hash_method="sha512sum", + ) my_download.authenticate() if branch is None: branch = str(my_download.latest_branch(rtype=release_type).branch) latest_version = my_download.latest_eos(branch, rtype=release_type) if str(latest_version) == BASE_VERSION_STR: - console.print(f'[red]Error[/red], cannot find any version in {branch} for {release_type} release type') + console.print( + f"[red]Error[/red], cannot find any version in {branch} for {release_type} release type" + ) sys.exit(1) my_download.version = str(latest_version) @@ -92,46 +180,71 @@ def eos( my_download.download_local(file_path=output, checksum=True) if import_docker: - my_download.docker_import( - image_name=docker_name - ) - console.print('✅ processing done !') + my_download.docker_import(image_name=docker_name) + console.print("✅ processing done !") sys.exit(0) - @click.command(no_args_is_help=True) @click.pass_context -@click.option('--format', default='upgrade', help='CVP Image type', type=click.Choice(CVP_IMAGE_TYPE), required=True) -@click.option('--version', default=None, help='CVP version', type=str, required=True) -@click.option('--output', default=str(os.path.relpath(os.getcwd(), start=os.curdir)), help='Path to save image', type=click.Path(),show_default=True) -@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False)) -def cvp(ctx: click.Context, version: str, format: str, output: str, log_level: str) -> int: +@click.option( + "--format", + default="upgrade", + help="CVP Image type", + type=click.Choice(CVP_IMAGE_TYPE), + required=True, +) +@click.option("--version", default=None, help="CVP version", type=str, required=True) +@click.option( + "--output", + default=str(os.path.relpath(os.getcwd(), start=os.curdir)), + help="Path to save image", + type=click.Path(), + show_default=True, +) +@click.option( + "--log-level", + "--log", + help="Logging level of the command", + default=None, + type=click.Choice( + ["debug", "info", "warning", "error", "critical"], case_sensitive=False + ), +) +def cvp( + ctx: click.Context, version: str, format: str, output: str, log_level: str +) -> int: """Download CVP image from Arista website""" console = Console() # Get from Context - token = ctx.obj['token'] - if token is None or token == '': - console.print('❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red") + token = ctx.obj["token"] + if token is None or token == "": + console.print( + "❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option", + style="bold red", + ) sys.exit(1) logger.remove() if log_level is not None: logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper()) - console.print("🪐 [bold blue]eos-downloader[/bold blue] is starting...", ) - console.print(f' - Image Type: {format}') - console.print(f' - Version: {version}') + console.print( + "🪐 [bold blue]eos-downloader[/bold blue] is starting...", + ) + console.print(f" - Image Type: {format}") + console.print(f" - Version: {version}") my_download = eos_downloader.eos.EOSDownloader( image=format, - software='CloudVision', + software="CloudVision", version=version, token=token, - hash_method='md5sum') + hash_method="md5sum", + ) my_download.authenticate() my_download.download_local(file_path=output, checksum=False) - console.print('✅ processing done !') + console.print("✅ processing done !") sys.exit(0) diff --git a/eos_downloader/cli/info/commands.py b/eos_downloader/cli/info/commands.py index b51003b..64097a1 100644 --- a/eos_downloader/cli/info/commands.py +++ b/eos_downloader/cli/info/commands.py @@ -24,12 +24,53 @@ from eos_downloader.models.version import BASE_VERSION_STR, RTYPE_FEATURE, RTYPE @click.command(no_args_is_help=True) @click.pass_context -@click.option('--latest', '-l', is_flag=True, type=click.BOOL, default=False, help='Get latest version in given branch. If --branch is not use, get the latest branch with specific release type') -@click.option('--release-type', '-rtype', type=click.Choice(RTYPES, case_sensitive=False), default=RTYPE_FEATURE, help='EOS release type to search') -@click.option('--branch', '-b', type=click.STRING, default=None, help='EOS Branch to list releases') -@click.option('--verbose', '-v', is_flag=True, type=click.BOOL, default=False, help='Human readable output. Default is none to use output in script)') -@click.option('--log-level', '--log', help='Logging level of the command', default='warning', type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False)) -def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = None, release_type: str = RTYPE_FEATURE, latest: bool = False, verbose: bool = False) -> None: +@click.option( + "--latest", + "-l", + is_flag=True, + type=click.BOOL, + default=False, + help="Get latest version in given branch. If --branch is not use, get the latest branch with specific release type", +) +@click.option( + "--release-type", + "-rtype", + type=click.Choice(RTYPES, case_sensitive=False), + default=RTYPE_FEATURE, + help="EOS release type to search", +) +@click.option( + "--branch", + "-b", + type=click.STRING, + default=None, + help="EOS Branch to list releases", +) +@click.option( + "--verbose", + "-v", + is_flag=True, + type=click.BOOL, + default=False, + help="Human readable output. Default is none to use output in script)", +) +@click.option( + "--log-level", + "--log", + help="Logging level of the command", + default="warning", + type=click.Choice( + ["debug", "info", "warning", "error", "critical"], case_sensitive=False + ), +) +def eos_versions( + ctx: click.Context, + log_level: str, + branch: Union[str, None] = None, + release_type: str = RTYPE_FEATURE, + latest: bool = False, + verbose: bool = False, +) -> None: # pylint: disable = too-many-branches """ List Available EOS version on Arista.com website. @@ -42,22 +83,23 @@ def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = N """ console = Console() # Get from Context - token = ctx.obj['token'] + token = ctx.obj["token"] logger.remove() if log_level is not None: logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper()) my_download = eos_downloader.eos.EOSDownloader( - image='unset', - software='EOS', - version='unset', + image="unset", + software="EOS", + version="unset", token=token, - hash_method='sha512sum') + hash_method="sha512sum", + ) auth = my_download.authenticate() if verbose and auth: - console.print('✅ Authenticated on arista.com') + console.print("✅ Authenticated on arista.com") if release_type is not None: release_type = release_type.upper() @@ -67,21 +109,27 @@ def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = N branch = str(my_download.latest_branch(rtype=release_type).branch) latest_version = my_download.latest_eos(branch, rtype=release_type) if str(latest_version) == BASE_VERSION_STR: - console.print(f'[red]Error[/red], cannot find any version in {branch} for {release_type} release type') + console.print( + f"[red]Error[/red], cannot find any version in {branch} for {release_type} release type" + ) sys.exit(1) if verbose: - console.print(f'Branch {branch} has been selected with release type {release_type}') + console.print( + f"Branch {branch} has been selected with release type {release_type}" + ) if branch is not None: - console.print(f'Latest release for {branch}: {latest_version}') + console.print(f"Latest release for {branch}: {latest_version}") else: - console.print(f'Latest EOS release: {latest_version}') + console.print(f"Latest EOS release: {latest_version}") else: - console.print(f'{ latest_version }') + console.print(f"{ latest_version }") else: versions = my_download.get_eos_versions(branch=branch, rtype=release_type) if verbose: - console.print(f'List of available versions for {branch if branch is not None else "all branches"}') + console.print( + f'List of available versions for {branch if branch is not None else "all branches"}' + ) for version in versions: - console.print(f' → {str(version)}') + console.print(f" → {str(version)}") else: pprint([str(version) for version in versions]) diff --git a/eos_downloader/cli/utils.py b/eos_downloader/cli/utils.py new file mode 100644 index 0000000..4a14f53 --- /dev/null +++ b/eos_downloader/cli/utils.py @@ -0,0 +1,38 @@ +#!/usr/bin/python +# coding: utf-8 -*- +# pylint: disable=inconsistent-return-statements + + +""" +Extension for the python ``click`` module +to provide a group or command with aliases. +""" + + +from typing import Any +import click + + +class AliasedGroup(click.Group): + """ + Implements a subclass of Group that accepts a prefix for a command. + If there were a command called push, it would accept pus as an alias (so long as it was unique) + """ + def get_command(self, ctx: click.Context, cmd_name: str) -> Any: + """Documentation to build""" + rv = click.Group.get_command(self, ctx, cmd_name) + if rv is not None: + return rv + matches = [x for x in self.list_commands(ctx) + if x.startswith(cmd_name)] + if not matches: + return None + if len(matches) == 1: + return click.Group.get_command(self, ctx, matches[0]) + ctx.fail(f"Too many matches: {', '.join(sorted(matches))}") + + def resolve_command(self, ctx: click.Context, args: Any) -> Any: + """Documentation to build""" + # always return the full command name + _, cmd, args = super().resolve_command(ctx, args) + return cmd.name, cmd, args diff --git a/eos_downloader/cvp.py b/eos_downloader/cvp.py index 6f14eb0..678daa8 100644 --- a/eos_downloader/cvp.py +++ b/eos_downloader/cvp.py @@ -6,11 +6,12 @@ CVP Uploader content """ import os -from typing import List, Optional, Any from dataclasses import dataclass -from loguru import logger +from typing import Any, List, Optional + from cvprac.cvp_client import CvpClient from cvprac.cvp_client_errors import CvpLoginError +from loguru import logger # from eos_downloader.tools import exc_to_str @@ -20,8 +21,9 @@ from cvprac.cvp_client_errors import CvpLoginError @dataclass class CvpAuthenticationItem: """ - Data structure to represent Cloudvision Authentication + Data structure to represent Cloudvision Authentication """ + server: str port: int = 443 token: Optional[str] = None @@ -29,15 +31,16 @@ class CvpAuthenticationItem: validate_cert: bool = False -class Filer(): +class Filer: # pylint: disable=too-few-public-methods """ Filer Helper for file management """ + def __init__(self, path: str) -> None: self.file_exist = False - self.filename = '' - self.absolute_path = '' + self.filename = "" + self.absolute_path = "" self.relative_path = path if os.path.exists(path): self.file_exist = True @@ -45,13 +48,14 @@ class Filer(): self.absolute_path = os.path.realpath(path) def __repr__(self) -> str: - return self.absolute_path if self.file_exist else '' + return self.absolute_path if self.file_exist else "" -class CvFeatureManager(): +class CvFeatureManager: """ CvFeatureManager Object to interect with Cloudvision """ + def __init__(self, authentication: CvpAuthenticationItem) -> None: """ __init__ Class Creator @@ -86,19 +90,21 @@ class CvFeatureManager(): try: client.connect( nodes=[authentication.server], - username='', - password='', + username="", + password="", api_token=authentication.token, is_cvaas=True, port=authentication.port, cert=authentication.validate_cert, - request_timeout=authentication.timeout + request_timeout=authentication.timeout, ) except CvpLoginError as error_data: - logger.error(f'Cannot connect to Cloudvision server {authentication.server}') - logger.debug(f'Error message: {error_data}') - logger.info('connected to Cloudvision server') - logger.debug(f'Connection info: {authentication}') + logger.error( + f"Cannot connect to Cloudvision server {authentication.server}" + ) + logger.debug(f"Error message: {error_data}") + logger.info("connected to Cloudvision server") + logger.debug(f"Connection info: {authentication}") return client def __get_images(self) -> List[Any]: @@ -111,8 +117,8 @@ class CvFeatureManager(): Fact returned by Cloudvision """ images = [] - logger.debug(' -> Collecting images') - images = self._cv_instance.api.get_images()['data'] + logger.debug(" -> Collecting images") + images = self._cv_instance.api.get_images()["data"] return images if self.__check_api_result(images) else [] # def __get_bundles(self): @@ -161,7 +167,11 @@ class CvFeatureManager(): bool True if present """ - return any(image_name == image['name'] for image in self._cv_images) if isinstance(self._cv_images, list) else False + return ( + any(image_name == image["name"] for image in self._cv_images) + if isinstance(self._cv_images, list) + else False + ) def _does_bundle_exist(self, bundle_name: str) -> bool: # pylint: disable=unused-argument @@ -192,19 +202,23 @@ class CvFeatureManager(): """ image_item = Filer(path=image_path) if image_item.file_exist is False: - logger.error(f'File not found: {image_item.relative_path}') + logger.error(f"File not found: {image_item.relative_path}") return False - logger.info(f'File path for image: {image_item}') + logger.info(f"File path for image: {image_item}") if self._does_image_exist(image_name=image_item.filename): - logger.error("Image found in Cloudvision , Please delete it before running this script") + logger.error( + "Image found in Cloudvision , Please delete it before running this script" + ) return False try: - upload_result = self._cv_instance.api.add_image(filepath=image_item.absolute_path) + upload_result = self._cv_instance.api.add_image( + filepath=image_item.absolute_path + ) except Exception as e: # pylint: disable=broad-exception-caught - logger.error('An error occurred during upload, check CV connection') - logger.error(f'Exception message is: {e}') + logger.error("An error occurred during upload, check CV connection") + logger.error(f"Exception message is: {e}") return False - logger.debug(f'Upload Result is : {upload_result}') + logger.debug(f"Upload Result is : {upload_result}") return True def build_image_list(self, image_list: List[str]) -> List[Any]: @@ -252,25 +266,30 @@ class CvFeatureManager(): bool True if succeeds """ - logger.debug(f'Init creation of an image bundle {name} with following images {images_name}') + logger.debug( + f"Init creation of an image bundle {name} with following images {images_name}" + ) all_images_present: List[bool] = [] self._cv_images = self.__get_images() all_images_present.extend( - self._does_image_exist(image_name=image_name) - for image_name in images_name + self._does_image_exist(image_name=image_name) for image_name in images_name ) # Bundle Create if self._does_bundle_exist(bundle_name=name) is False: - logger.debug(f'Creating image bundle {name} with following images {images_name}') + logger.debug( + f"Creating image bundle {name} with following images {images_name}" + ) images_data = self.build_image_list(image_list=images_name) if images_data is not None: - logger.debug('Images information: {images_data}') + logger.debug("Images information: {images_data}") try: - data = self._cv_instance.api.save_image_bundle(name=name, images=images_data) + data = self._cv_instance.api.save_image_bundle( + name=name, images=images_data + ) except Exception as e: # pylint: disable=broad-exception-caught - logger.critical(f'{e}') + logger.critical(f"{e}") else: logger.debug(data) return True - logger.critical('No data found for images') + logger.critical("No data found for images") return False diff --git a/eos_downloader/data.py b/eos_downloader/data.py index 74f2f8e..ba54b3b 100644 --- a/eos_downloader/data.py +++ b/eos_downloader/data.py @@ -12,82 +12,22 @@ Data are built from content of Arista XML file # [platform][image][version] DATA_MAPPING = { "CloudVision": { - "ova": { - "extension": ".ova", - "prepend": "cvp", - "folder_level": 0 - }, - "rpm": { - "extension": "", - "prepend": "cvp-rpm-installer", - "folder_level": 0 - }, - "kvm": { - "extension": "-kvm.tgz", - "prepend": "cvp", - "folder_level": 0 - }, - "upgrade": { - "extension": ".tgz", - "prepend": "cvp-upgrade", - "folder_level": 0 - }, + "ova": {"extension": ".ova", "prepend": "cvp", "folder_level": 0}, + "rpm": {"extension": "", "prepend": "cvp-rpm-installer", "folder_level": 0}, + "kvm": {"extension": "-kvm.tgz", "prepend": "cvp", "folder_level": 0}, + "upgrade": {"extension": ".tgz", "prepend": "cvp-upgrade", "folder_level": 0}, }, "EOS": { - "64": { - "extension": ".swi", - "prepend": "EOS64", - "folder_level": 0 - }, - "INT": { - "extension": "-INT.swi", - "prepend": "EOS", - "folder_level": 1 - }, - "2GB-INT": { - "extension": "-INT.swi", - "prepend": "EOS-2GB", - "folder_level": 1 - }, - "cEOS": { - "extension": ".tar.xz", - "prepend": "cEOS-lab", - "folder_level": 0 - }, - "cEOS64": { - "extension": ".tar.xz", - "prepend": "cEOS64-lab", - "folder_level": 0 - }, - "vEOS": { - "extension": ".vmdk", - "prepend": "vEOS", - "folder_level": 0 - }, - "vEOS-lab": { - "extension": ".vmdk", - "prepend": "vEOS-lab", - "folder_level": 0 - }, - "EOS-2GB": { - "extension": ".swi", - "prepend": "EOS-2GB", - "folder_level": 0 - }, - "RN": { - "extension": "-", - "prepend": "RN", - "folder_level": 0 - }, - "SOURCE": { - "extension": "-source.tar", - "prepend": "EOS", - "folder_level": 0 - }, - "default": { - "extension": ".swi", - "prepend": "EOS", - "folder_level": 0 - } - } + "64": {"extension": ".swi", "prepend": "EOS64", "folder_level": 0}, + "INT": {"extension": "-INT.swi", "prepend": "EOS", "folder_level": 1}, + "2GB-INT": {"extension": "-INT.swi", "prepend": "EOS-2GB", "folder_level": 1}, + "cEOS": {"extension": ".tar.xz", "prepend": "cEOS-lab", "folder_level": 0}, + "cEOS64": {"extension": ".tar.xz", "prepend": "cEOS64-lab", "folder_level": 0}, + "vEOS": {"extension": ".vmdk", "prepend": "vEOS", "folder_level": 0}, + "vEOS-lab": {"extension": ".vmdk", "prepend": "vEOS-lab", "folder_level": 0}, + "EOS-2GB": {"extension": ".swi", "prepend": "EOS-2GB", "folder_level": 0}, + "RN": {"extension": "-", "prepend": "RN", "folder_level": 0}, + "SOURCE": {"extension": "-source.tar", "prepend": "EOS", "folder_level": 0}, + "default": {"extension": ".swi", "prepend": "EOS", "folder_level": 0}, + }, } diff --git a/eos_downloader/download.py b/eos_downloader/download.py index 2297b04..2c9576e 100644 --- a/eos_downloader/download.py +++ b/eos_downloader/download.py @@ -8,13 +8,20 @@ import os.path import signal from concurrent.futures import ThreadPoolExecutor from threading import Event -from typing import Iterable, Any +from typing import Any, Iterable import requests import rich from rich import console -from rich.progress import (BarColumn, DownloadColumn, Progress, TaskID, - TextColumn, TimeElapsedColumn, TransferSpeedColumn) +from rich.progress import ( + BarColumn, + DownloadColumn, + Progress, + TaskID, + TextColumn, + TimeElapsedColumn, + TransferSpeedColumn, +) console = rich.get_console() done_event = Event() @@ -28,7 +35,7 @@ def handle_sigint(signum: Any, frame: Any) -> None: signal.signal(signal.SIGINT, handle_sigint) -class DownloadProgressBar(): +class DownloadProgressBar: """ Object to manage Download process with Progress Bar from Rich """ @@ -38,7 +45,9 @@ class DownloadProgressBar(): Class Constructor """ self.progress = Progress( - TextColumn("💾 Downloading [bold blue]{task.fields[filename]}", justify="right"), + TextColumn( + "💾 Downloading [bold blue]{task.fields[filename]}", justify="right" + ), BarColumn(bar_width=None), "[progress.percentage]{task.percentage:>3.1f}%", "•", @@ -48,14 +57,16 @@ class DownloadProgressBar(): "•", TimeElapsedColumn(), "•", - console=console + console=console, ) - def _copy_url(self, task_id: TaskID, url: str, path: str, block_size: int = 1024) -> bool: + def _copy_url( + self, task_id: TaskID, url: str, path: str, block_size: int = 1024 + ) -> bool: """Copy data from a url to a local file.""" response = requests.get(url, stream=True, timeout=5) # This will break if the response doesn't contain content length - self.progress.update(task_id, total=int(response.headers['Content-Length'])) + self.progress.update(task_id, total=int(response.headers["Content-Length"])) with open(path, "wb") as dest_file: self.progress.start_task(task_id) for data in response.iter_content(chunk_size=block_size): @@ -71,7 +82,9 @@ class DownloadProgressBar(): with self.progress: with ThreadPoolExecutor(max_workers=4) as pool: for url in urls: - filename = url.split("/")[-1].split('?')[0] + filename = url.split("/")[-1].split("?")[0] dest_path = os.path.join(dest_dir, filename) - task_id = self.progress.add_task("download", filename=filename, start=False) + task_id = self.progress.add_task( + "download", filename=filename, start=False + ) pool.submit(self._copy_url, task_id, url, dest_path) diff --git a/eos_downloader/eos.py b/eos_downloader/eos.py index e5f3670..716992f 100644 --- a/eos_downloader/eos.py +++ b/eos_downloader/eos.py @@ -14,13 +14,20 @@ import rich from loguru import logger from rich import console -from eos_downloader.models.version import BASE_BRANCH_STR, BASE_VERSION_STR, REGEX_EOS_VERSION, RTYPE_FEATURE, EosVersion +from eos_downloader.models.version import ( + BASE_BRANCH_STR, + BASE_VERSION_STR, + REGEX_EOS_VERSION, + RTYPE_FEATURE, + EosVersion, +) from eos_downloader.object_downloader import ObjectDownloader # logger = logging.getLogger(__name__) console = rich.get_console() + class EOSDownloader(ObjectDownloader): """ EOSDownloader Object to download EOS images from Arista.com website @@ -47,22 +54,27 @@ class EOSDownloader(ObjectDownloader): file_path : str Path where EOS image is located """ - logger.info('Mounting volume to disable ZTP') - console.print('🚀 Mounting volume to disable ZTP') + logger.info("Mounting volume to disable ZTP") + console.print("🚀 Mounting volume to disable ZTP") raw_folder = os.path.join(file_path, "raw") os.system(f"rm -rf {raw_folder}") os.system(f"mkdir -p {raw_folder}") os.system( - f'guestmount -a {os.path.join(file_path, "hda.qcow2")} -m /dev/sda2 {os.path.join(file_path, "raw")}') - ztp_file = os.path.join(file_path, 'raw/zerotouch-config') - with open(ztp_file, 'w', encoding='ascii') as zfile: - zfile.write('DISABLE=True') - logger.info(f'Unmounting volume in {file_path}') + f'guestmount -a {os.path.join(file_path, "hda.qcow2")} -m /dev/sda2 {os.path.join(file_path, "raw")}' + ) + ztp_file = os.path.join(file_path, "raw/zerotouch-config") + with open(ztp_file, "w", encoding="ascii") as zfile: + zfile.write("DISABLE=True") + logger.info(f"Unmounting volume in {file_path}") os.system(f"guestunmount {os.path.join(file_path, 'raw')}") os.system(f"rm -rf {os.path.join(file_path, 'raw')}") logger.info(f"Volume has been successfully unmounted at {file_path}") - def _parse_xml_for_version(self,root_xml: ET.ElementTree, xpath: str = './/dir[@label="Active Releases"]/dir/dir/[@label]') -> List[EosVersion]: + def _parse_xml_for_version( + self, + root_xml: ET.ElementTree, + xpath: str = './/dir[@label="Active Releases"]/dir/dir/[@label]', + ) -> List[EosVersion]: """ Extract list of available EOS versions from Arista.com website @@ -77,19 +89,21 @@ class EOSDownloader(ObjectDownloader): """ # XPATH: .//dir[@label="Active Releases"]/dir/dir/[@label] if self.eos_versions is None: - logger.debug(f'Using xpath {xpath}') + logger.debug(f"Using xpath {xpath}") eos_versions = [] for node in root_xml.findall(xpath): - if 'label' in node.attrib and node.get('label') is not None: - label = node.get('label') + if "label" in node.attrib and node.get("label") is not None: + label = node.get("label") if label is not None and REGEX_EOS_VERSION.match(label): eos_version = EosVersion.from_str(label) eos_versions.append(eos_version) logger.debug(f"Found {label} - {eos_version}") - logger.debug(f'List of versions found on arista.com is: {eos_versions}') + logger.debug(f"List of versions found on arista.com is: {eos_versions}") self.eos_versions = eos_versions else: - logger.debug('receiving instruction to download versions, but already available') + logger.debug( + "receiving instruction to download versions, but already available" + ) return self.eos_versions def _get_branches(self, with_rtype: str = RTYPE_FEATURE) -> List[str]: @@ -104,9 +118,11 @@ class EOSDownloader(ObjectDownloader): Returns: List[str]: A lsit of string that represent all availables EOS branches """ - root = self._get_folder_tree() + root = self.get_folder_tree() versions = self._parse_xml_for_version(root_xml=root) - return list({version.branch for version in versions if version.rtype == with_rtype}) + return list( + {version.branch for version in versions if version.rtype == with_rtype} + ) def latest_branch(self, rtype: str = RTYPE_FEATURE) -> EosVersion: """ @@ -125,7 +141,9 @@ class EOSDownloader(ObjectDownloader): selected_branch = branch return selected_branch - def get_eos_versions(self, branch: Union[str,None] = None, rtype: Union[str,None] = None) -> List[EosVersion]: + def get_eos_versions( + self, branch: Union[str, None] = None, rtype: Union[str, None] = None + ) -> List[EosVersion]: """ Get a list of available EOS version available on arista.com @@ -139,16 +157,22 @@ class EOSDownloader(ObjectDownloader): Returns: List[EosVersion]: A list of versions available """ - root = self._get_folder_tree() + root = self.get_folder_tree() result = [] for version in self._parse_xml_for_version(root_xml=root): if branch is None and (version.rtype == rtype or rtype is None): result.append(version) - elif branch is not None and version.is_in_branch(branch) and version.rtype == rtype: + elif ( + branch is not None + and version.is_in_branch(branch) + and version.rtype == rtype + ): result.append(version) return result - def latest_eos(self, branch: Union[str,None] = None, rtype: str = RTYPE_FEATURE) -> EosVersion: + def latest_eos( + self, branch: Union[str, None] = None, rtype: str = RTYPE_FEATURE + ) -> EosVersion: """ Get latest version of EOS @@ -168,7 +192,9 @@ class EOSDownloader(ObjectDownloader): latest_branch = self.latest_branch(rtype=rtype) else: latest_branch = EosVersion.from_str(branch) - for version in self.get_eos_versions(branch=str(latest_branch.branch), rtype=rtype): + for version in self.get_eos_versions( + branch=str(latest_branch.branch), rtype=rtype + ): if version > selected_version: if rtype is not None and version.rtype == rtype: selected_version = version diff --git a/eos_downloader/models/version.py b/eos_downloader/models/version.py index 4b051a5..22de100 100644 --- a/eos_downloader/models/version.py +++ b/eos_downloader/models/version.py @@ -16,11 +16,11 @@ from eos_downloader.tools import exc_to_str # logger = logging.getLogger(__name__) -BASE_VERSION_STR = '4.0.0F' -BASE_BRANCH_STR = '4.0' +BASE_VERSION_STR = "4.0.0F" +BASE_BRANCH_STR = "4.0" -RTYPE_FEATURE = 'F' -RTYPE_MAINTENANCE = 'M' +RTYPE_FEATURE = "F" +RTYPE_MAINTENANCE = "M" RTYPES = [RTYPE_FEATURE, RTYPE_MAINTENANCE] # Regular Expression to capture multiple EOS version format @@ -29,8 +29,12 @@ RTYPES = [RTYPE_FEATURE, RTYPE_MAINTENANCE] # 4.21.1M # 4.28.10.F # 4.28.6.1M -REGEX_EOS_VERSION = re.compile(r"^.*(?P<major>4)\.(?P<minor>\d{1,2})\.(?P<patch>\d{1,2})(?P<other>\.\d*)*(?P<rtype>[M,F])*$") -REGEX_EOS_BRANCH = re.compile(r"^.*(?P<major>4)\.(?P<minor>\d{1,2})(\.?P<patch>\d)*(\.\d)*(?P<rtype>[M,F])*$") +REGEX_EOS_VERSION = re.compile( + r"^.*(?P<major>4)\.(?P<minor>\d{1,2})\.(?P<patch>\d{1,2})(?P<other>\.\d*)*(?P<rtype>[M,F])*$" +) +REGEX_EOS_BRANCH = re.compile( + r"^.*(?P<major>4)\.(?P<minor>\d{1,2})(\.?P<patch>\d)*(\.\d)*(?P<rtype>[M,F])*$" +) class EosVersion(BaseModel): @@ -59,10 +63,11 @@ class EosVersion(BaseModel): Args: BaseModel (Pydantic): Pydantic Base Model """ + major: int = 4 minor: int = 0 patch: int = 0 - rtype: Optional[str] = 'F' + rtype: Optional[str] = "F" other: Any = None @classmethod @@ -84,7 +89,7 @@ class EosVersion(BaseModel): Returns: EosVersion object """ - logger.debug(f'receiving version: {eos_version}') + logger.debug(f"receiving version: {eos_version}") if REGEX_EOS_VERSION.match(eos_version): matches = REGEX_EOS_VERSION.match(eos_version) # assert matches is not None @@ -95,7 +100,7 @@ class EosVersion(BaseModel): # assert matches is not None assert matches is not None return cls(**matches.groupdict()) - logger.error(f'Error occured with {eos_version}') + logger.error(f"Error occured with {eos_version}") return EosVersion() @property @@ -106,7 +111,7 @@ class EosVersion(BaseModel): Returns: str: branch from version """ - return f'{self.major}.{self.minor}' + return f"{self.major}.{self.minor}" def __str__(self) -> str: """ @@ -118,8 +123,8 @@ class EosVersion(BaseModel): str: A standard EOS version string representing <MAJOR>.<MINOR>.<PATCH><RTYPE> """ if self.other is None: - return f'{self.major}.{self.minor}.{self.patch}{self.rtype}' - return f'{self.major}.{self.minor}.{self.patch}{self.other}{self.rtype}' + return f"{self.major}.{self.minor}.{self.patch}{self.rtype}" + return f"{self.major}.{self.minor}.{self.patch}{self.other}{self.rtype}" def _compare(self, other: EosVersion) -> float: """ @@ -141,58 +146,68 @@ class EosVersion(BaseModel): float: -1 if ver1 < ver2, 0 if ver1 == ver2, 1 if ver1 > ver2 """ if not isinstance(other, EosVersion): - raise ValueError(f'could not compare {other} as it is not an EosVersion object') + raise ValueError( + f"could not compare {other} as it is not an EosVersion object" + ) comparison_flag: float = 0 - logger.warning(f'current version {self.__str__()} - other {str(other)}') # pylint: disable = unnecessary-dunder-call + logger.warning( + f"current version {self.__str__()} - other {str(other)}" # pylint: disable = unnecessary-dunder-call + ) for key, _ in self.dict().items(): - if comparison_flag == 0 and self.dict()[key] is None or other.dict()[key] is None: - logger.debug(f'{key}: local None - remote None') - logger.debug(f'{key}: local {self.dict()} - remote {other.dict()}') + if ( + comparison_flag == 0 + and self.dict()[key] is None + or other.dict()[key] is None + ): + logger.debug(f"{key}: local None - remote None") + logger.debug(f"{key}: local {self.dict()} - remote {other.dict()}") return comparison_flag - logger.debug(f'{key}: local {self.dict()[key]} - remote {other.dict()[key]}') + logger.debug( + f"{key}: local {self.dict()[key]} - remote {other.dict()[key]}" + ) if comparison_flag == 0 and self.dict()[key] < other.dict()[key]: comparison_flag = -1 if comparison_flag == 0 and self.dict()[key] > other.dict()[key]: comparison_flag = 1 if comparison_flag != 0: - logger.info(f'comparison result is {comparison_flag}') + logger.info(f"comparison result is {comparison_flag}") return comparison_flag - logger.info(f'comparison result is {comparison_flag}') + logger.info(f"comparison result is {comparison_flag}") return comparison_flag @typing.no_type_check def __eq__(self, other): - """ Implement __eq__ function (==) """ + """Implement __eq__ function (==)""" return self._compare(other) == 0 @typing.no_type_check def __ne__(self, other): # type: ignore - """ Implement __nw__ function (!=) """ + """Implement __nw__ function (!=)""" return self._compare(other) != 0 @typing.no_type_check def __lt__(self, other): # type: ignore - """ Implement __lt__ function (<) """ + """Implement __lt__ function (<)""" return self._compare(other) < 0 @typing.no_type_check def __le__(self, other): # type: ignore - """ Implement __le__ function (<=) """ + """Implement __le__ function (<=)""" return self._compare(other) <= 0 @typing.no_type_check def __gt__(self, other): # type: ignore - """ Implement __gt__ function (>) """ + """Implement __gt__ function (>)""" return self._compare(other) > 0 @typing.no_type_check def __ge__(self, other): # type: ignore - """ Implement __ge__ function (>=) """ + """Implement __ge__ function (>=)""" return self._compare(other) >= 0 def match(self, match_expr: str) -> bool: @@ -236,7 +251,7 @@ class EosVersion(BaseModel): "['<', '>', '==', '<=', '>=', '!=']. " f"You provided: {match_expr}" ) - logger.debug(f'work on comparison {prefix} with base release {match_version}') + logger.debug(f"work on comparison {prefix} with base release {match_version}") possibilities_dict = { ">": (1,), "<": (-1,), @@ -263,7 +278,7 @@ class EosVersion(BaseModel): bool: True if current version is in provided branch, otherwise False """ try: - logger.debug(f'reading branch str:{branch_str}') + logger.debug(f"reading branch str:{branch_str}") branch = EosVersion.from_str(branch_str) except Exception as error: # pylint: disable = broad-exception-caught logger.error(exc_to_str(error)) diff --git a/eos_downloader/object_downloader.py b/eos_downloader/object_downloader.py index 0420acb..d7b1418 100644 --- a/eos_downloader/object_downloader.py +++ b/eos_downloader/object_downloader.py @@ -8,8 +8,13 @@ eos_downloader class definition """ -from __future__ import (absolute_import, division, print_function, - unicode_literals, annotations) +from __future__ import ( + absolute_import, + annotations, + division, + print_function, + unicode_literals, +) import base64 import glob @@ -26,9 +31,14 @@ from loguru import logger from rich import console from tqdm import tqdm -from eos_downloader import (ARISTA_DOWNLOAD_URL, ARISTA_GET_SESSION, - ARISTA_SOFTWARE_FOLDER_TREE, EVE_QEMU_FOLDER_PATH, - MSG_INVALID_DATA, MSG_TOKEN_EXPIRED) +from eos_downloader import ( + ARISTA_DOWNLOAD_URL, + ARISTA_GET_SESSION, + ARISTA_SOFTWARE_FOLDER_TREE, + EVE_QEMU_FOLDER_PATH, + MSG_INVALID_DATA, + MSG_TOKEN_EXPIRED, +) from eos_downloader.data import DATA_MAPPING from eos_downloader.download import DownloadProgressBar @@ -37,11 +47,19 @@ from eos_downloader.download import DownloadProgressBar console = rich.get_console() -class ObjectDownloader(): +class ObjectDownloader: """ ObjectDownloader Generic Object to download from Arista.com """ - def __init__(self, image: str, version: str, token: str, software: str = 'EOS', hash_method: str = 'md5sum'): + + def __init__( + self, + image: str, + version: str, + token: str, + software: str = "EOS", + hash_method: str = "md5sum", + ): """ __init__ Class constructor @@ -70,10 +88,10 @@ class ObjectDownloader(): self.hash_method = hash_method self.timeout = 5 # Logging - logger.debug(f'Filename built by _build_filename is {self.filename}') + logger.debug(f"Filename built by _build_filename is {self.filename}") def __str__(self) -> str: - return f'{self.software} - {self.image} - {self.version}' + return f"{self.software} - {self.image} - {self.version}" # def __repr__(self): # return str(self.__dict__) @@ -102,16 +120,18 @@ class ObjectDownloader(): str: Filename to search for on Arista.com """ - logger.info('start build') + logger.info("start build") if self.software in DATA_MAPPING: - logger.info(f'software in data mapping: {self.software}') + logger.info(f"software in data mapping: {self.software}") if self.image in DATA_MAPPING[self.software]: - logger.info(f'image in data mapping: {self.image}') + logger.info(f"image in data mapping: {self.image}") return f"{DATA_MAPPING[self.software][self.image]['prepend']}-{self.version}{DATA_MAPPING[self.software][self.image]['extension']}" return f"{DATA_MAPPING[self.software]['default']['prepend']}-{self.version}{DATA_MAPPING[self.software]['default']['extension']}" - raise ValueError(f'Incorrect value for software {self.software}') + raise ValueError(f"Incorrect value for software {self.software}") - def _parse_xml_for_path(self, root_xml: ET.ElementTree, xpath: str, search_file: str) -> str: + def _parse_xml_for_path( + self, root_xml: ET.ElementTree, xpath: str, search_file: str + ) -> str: # sourcery skip: remove-unnecessary-cast """ _parse_xml Read and extract data from XML using XPATH @@ -132,18 +152,18 @@ class ObjectDownloader(): str File Path on Arista server side """ - logger.debug(f'Using xpath {xpath}') - logger.debug(f'Search for file {search_file}') - console.print(f'🔎 Searching file {search_file}') + logger.debug(f"Using xpath {xpath}") + logger.debug(f"Search for file {search_file}") + console.print(f"🔎 Searching file {search_file}") for node in root_xml.findall(xpath): # logger.debug('Found {}', node.text) if str(node.text).lower() == search_file.lower(): - path = node.get('path') - console.print(f' -> Found file at {path}') + path = node.get("path") + console.print(f" -> Found file at {path}") logger.info(f'Found {node.text} at {node.get("path")}') - return str(node.get('path')) if node.get('path') is not None else '' - logger.error(f'Requested file ({self.filename}) not found !') - return '' + return str(node.get("path")) if node.get("path") is not None else "" + logger.error(f"Requested file ({self.filename}) not found !") + return "" def _get_hash(self, file_path: str) -> str: """ @@ -165,10 +185,10 @@ class ObjectDownloader(): dl_rich_progress_bar = DownloadProgressBar() dl_rich_progress_bar.download(urls=[hash_url], dest_dir=file_path) hash_downloaded = f"{file_path}/{os.path.basename(remote_hash_file)}" - hash_content = 'unset' - with open(hash_downloaded, 'r', encoding='utf-8') as f: + hash_content = "unset" + with open(hash_downloaded, "r", encoding="utf-8") as f: hash_content = f.read() - return hash_content.split(' ')[0] + return hash_content.split(" ")[0] @staticmethod def _compute_hash_md5sum(file: str, hash_expected: str) -> bool: @@ -195,7 +215,9 @@ class ObjectDownloader(): hash_md5.update(chunk) if hash_md5.hexdigest() == hash_expected: return True - logger.warning(f'Downloaded file is corrupt: local md5 ({hash_md5.hexdigest()}) is different to md5 from arista ({hash_expected})') + logger.warning( + f"Downloaded file is corrupt: local md5 ({hash_md5.hexdigest()}) is different to md5 from arista ({hash_expected})" + ) return False @staticmethod @@ -223,10 +245,12 @@ class ObjectDownloader(): hash_sha512.update(chunk) if hash_sha512.hexdigest() == hash_expected: return True - logger.warning(f'Downloaded file is corrupt: local sha512 ({hash_sha512.hexdigest()}) is different to sha512 from arista ({hash_expected})') + logger.warning( + f"Downloaded file is corrupt: local sha512 ({hash_sha512.hexdigest()}) is different to sha512 from arista ({hash_expected})" + ) return False - def _get_folder_tree(self) -> ET.ElementTree: + def get_folder_tree(self) -> ET.ElementTree: """ _get_folder_tree Download XML tree from Arista server @@ -237,15 +261,17 @@ class ObjectDownloader(): """ if self.session_id is None: self.authenticate() - jsonpost = {'sessionCode': self.session_id} - result = requests.post(ARISTA_SOFTWARE_FOLDER_TREE, data=json.dumps(jsonpost), timeout=self.timeout) + jsonpost = {"sessionCode": self.session_id} + result = requests.post( + ARISTA_SOFTWARE_FOLDER_TREE, data=json.dumps(jsonpost), timeout=self.timeout + ) try: folder_tree = result.json()["data"]["xml"] return ET.ElementTree(ET.fromstring(folder_tree)) except KeyError as error: logger.error(MSG_INVALID_DATA) - logger.error(f'Server returned: {error}') - console.print(f'❌ {MSG_INVALID_DATA}', style="bold red") + logger.error(f"Server returned: {error}") + console.print(f"❌ {MSG_INVALID_DATA}", style="bold red") sys.exit(1) def _get_remote_filepath(self) -> str: @@ -259,12 +285,14 @@ class ObjectDownloader(): str Remote path of the file to download """ - root = self._get_folder_tree() + root = self.get_folder_tree() logger.debug("GET XML content from ARISTA.com") xpath = f'.//dir[@label="{self.software}"]//file' - return self._parse_xml_for_path(root_xml=root, xpath=xpath, search_file=self.filename) + return self._parse_xml_for_path( + root_xml=root, xpath=xpath, search_file=self.filename + ) - def _get_remote_hashpath(self, hash_method: str = 'md5sum') -> str: + def _get_remote_hashpath(self, hash_method: str = "md5sum") -> str: """ _get_remote_hashpath Helper to get path of the hash's file to download @@ -275,16 +303,16 @@ class ObjectDownloader(): str Remote path of the hash's file to download """ - root = self._get_folder_tree() + root = self.get_folder_tree() logger.debug("GET XML content from ARISTA.com") xpath = f'.//dir[@label="{self.software}"]//file' return self._parse_xml_for_path( root_xml=root, xpath=xpath, - search_file=f'{self.filename}.{hash_method}', + search_file=f"{self.filename}.{hash_method}", ) - def _get_url(self, remote_file_path: str) -> str: + def _get_url(self, remote_file_path: str) -> str: """ _get_url Get URL to use for downloading file from Arista server @@ -302,13 +330,15 @@ class ObjectDownloader(): """ if self.session_id is None: self.authenticate() - jsonpost = {'sessionCode': self.session_id, 'filePath': remote_file_path} - result = requests.post(ARISTA_DOWNLOAD_URL, data=json.dumps(jsonpost), timeout=self.timeout) - if 'data' in result.json() and 'url' in result.json()['data']: + jsonpost = {"sessionCode": self.session_id, "filePath": remote_file_path} + result = requests.post( + ARISTA_DOWNLOAD_URL, data=json.dumps(jsonpost), timeout=self.timeout + ) + if "data" in result.json() and "url" in result.json()["data"]: # logger.debug('URL to download file is: {}', result.json()) return result.json()["data"]["url"] - logger.critical(f'Server returns following message: {result.json()}') - return '' + logger.critical(f"Server returns following message: {result.json()}") + return "" @staticmethod def _download_file_raw(url: str, file_path: str) -> str: @@ -331,31 +361,40 @@ class ObjectDownloader(): """ chunkSize = 1024 r = requests.get(url, stream=True, timeout=5) - with open(file_path, 'wb') as f: - pbar = tqdm(unit="B", total=int(r.headers['Content-Length']), unit_scale=True, unit_divisor=1024) + with open(file_path, "wb") as f: + pbar = tqdm( + unit="B", + total=int(r.headers["Content-Length"]), + unit_scale=True, + unit_divisor=1024, + ) for chunk in r.iter_content(chunk_size=chunkSize): if chunk: pbar.update(len(chunk)) f.write(chunk) return file_path - def _download_file(self, file_path: str, filename: str, rich_interface: bool = True) -> Union[None, str]: + def _download_file( + self, file_path: str, filename: str, rich_interface: bool = True + ) -> Union[None, str]: remote_file_path = self._get_remote_filepath() - logger.info(f'File found on arista server: {remote_file_path}') + logger.info(f"File found on arista server: {remote_file_path}") file_url = self._get_url(remote_file_path=remote_file_path) if file_url is not False: if not rich_interface: - return self._download_file_raw(url=file_url, file_path=os.path.join(file_path, filename)) + return self._download_file_raw( + url=file_url, file_path=os.path.join(file_path, filename) + ) rich_downloader = DownloadProgressBar() rich_downloader.download(urls=[file_url], dest_dir=file_path) return os.path.join(file_path, filename) - logger.error(f'Cannot download file {file_path}') + logger.error(f"Cannot download file {file_path}") return None @staticmethod def _create_destination_folder(path: str) -> None: # os.makedirs(path, mode, exist_ok=True) - os.system(f'mkdir -p {path}') + os.system(f"mkdir -p {path}") @staticmethod def _disable_ztp(file_path: str) -> None: @@ -379,24 +418,29 @@ class ObjectDownloader(): """ credentials = (base64.b64encode(self.token.encode())).decode("utf-8") session_code_url = ARISTA_GET_SESSION - jsonpost = {'accessToken': credentials} + jsonpost = {"accessToken": credentials} - result = requests.post(session_code_url, data=json.dumps(jsonpost), timeout=self.timeout) + result = requests.post( + session_code_url, data=json.dumps(jsonpost), timeout=self.timeout + ) - if result.json()["status"]["message"] in[ 'Access token expired', 'Invalid access token']: - console.print(f'❌ {MSG_TOKEN_EXPIRED}', style="bold red") + if result.json()["status"]["message"] in [ + "Access token expired", + "Invalid access token", + ]: + console.print(f"❌ {MSG_TOKEN_EXPIRED}", style="bold red") logger.error(MSG_TOKEN_EXPIRED) return False try: - if 'data' in result.json(): + if "data" in result.json(): self.session_id = result.json()["data"]["session_code"] - logger.info('Authenticated on arista.com') + logger.info("Authenticated on arista.com") return True - logger.debug(f'{result.json()}') + logger.debug(f"{result.json()}") return False except KeyError as error_arista: - logger.error(f'Error: {error_arista}') + logger.error(f"Error: {error_arista}") sys.exit(1) def download_local(self, file_path: str, checksum: bool = False) -> bool: @@ -422,25 +466,33 @@ class ObjectDownloader(): bool True if everything went well, False if any problem appears """ - file_downloaded = str(self._download_file(file_path=file_path, filename=self.filename)) + file_downloaded = str( + self._download_file(file_path=file_path, filename=self.filename) + ) # Check file HASH hash_result = False if checksum: - logger.info('🚀 Running checksum validation') - console.print('🚀 Running checksum validation') - if self.hash_method == 'md5sum': + logger.info("🚀 Running checksum validation") + console.print("🚀 Running checksum validation") + if self.hash_method == "md5sum": hash_expected = self._get_hash(file_path=file_path) - hash_result = self._compute_hash_md5sum(file=file_downloaded, hash_expected=hash_expected) - elif self.hash_method == 'sha512sum': + hash_result = self._compute_hash_md5sum( + file=file_downloaded, hash_expected=hash_expected + ) + elif self.hash_method == "sha512sum": hash_expected = self._get_hash(file_path=file_path) - hash_result = self._compute_hash_sh512sum(file=file_downloaded, hash_expected=hash_expected) + hash_result = self._compute_hash_sh512sum( + file=file_downloaded, hash_expected=hash_expected + ) if not hash_result: - logger.error('Downloaded file is corrupted, please check your connection') - console.print('❌ Downloaded file is corrupted, please check your connection') + logger.error("Downloaded file is corrupted, please check your connection") + console.print( + "❌ Downloaded file is corrupted, please check your connection" + ) return False - logger.info('Downloaded file is correct.') - console.print('✅ Downloaded file is correct.') + logger.info("Downloaded file is correct.") + console.print("✅ Downloaded file is correct.") return True def provision_eve(self, noztp: bool = False, checksum: bool = True) -> None: @@ -466,7 +518,7 @@ class ObjectDownloader(): # Build image name to use in folder path eos_image_name = self.filename.rstrip(".vmdk").lower() if noztp: - eos_image_name = f'{eos_image_name}-noztp' + eos_image_name = f"{eos_image_name}-noztp" # Create full path for EVE-NG file_path = os.path.join(EVE_QEMU_FOLDER_PATH, eos_image_name.rstrip()) # Create folders in filesystem @@ -474,20 +526,23 @@ class ObjectDownloader(): # Download file to local destination file_downloaded = self._download_file( - file_path=file_path, filename=self.filename) + file_path=file_path, filename=self.filename + ) # Convert to QCOW2 format file_qcow2 = os.path.join(file_path, "hda.qcow2") - logger.info('Converting VMDK to QCOW2 format') - console.print('🚀 Converting VMDK to QCOW2 format...') + logger.info("Converting VMDK to QCOW2 format") + console.print("🚀 Converting VMDK to QCOW2 format...") - os.system(f'$(which qemu-img) convert -f vmdk -O qcow2 {file_downloaded} {file_qcow2}') + os.system( + f"$(which qemu-img) convert -f vmdk -O qcow2 {file_downloaded} {file_qcow2}" + ) - logger.info('Applying unl_wrapper to fix permissions') - console.print('Applying unl_wrapper to fix permissions') + logger.info("Applying unl_wrapper to fix permissions") + console.print("Applying unl_wrapper to fix permissions") - os.system('/opt/unetlab/wrappers/unl_wrapper -a fixpermissions') - os.system(f'rm -f {file_downloaded}') + os.system("/opt/unetlab/wrappers/unl_wrapper -a fixpermissions") + os.system(f"rm -f {file_downloaded}") if noztp: self._disable_ztp(file_path=file_path) @@ -502,12 +557,12 @@ class ObjectDownloader(): version (str): image_name (str, optional): Image name to use. Defaults to "arista/ceos". """ - docker_image = f'{image_name}:{self.version}' - logger.info(f'Importing image {self.filename} to {docker_image}') - console.print(f'🚀 Importing image {self.filename} to {docker_image}') - os.system(f'$(which docker) import {self.filename} {docker_image}') - for filename in glob.glob(f'{self.filename}*'): + docker_image = f"{image_name}:{self.version}" + logger.info(f"Importing image {self.filename} to {docker_image}") + console.print(f"🚀 Importing image {self.filename} to {docker_image}") + os.system(f"$(which docker) import {self.filename} {docker_image}") + for filename in glob.glob(f"{self.filename}*"): try: os.remove(filename) except FileNotFoundError: - console.print(f'File not found: {filename}') + console.print(f"File not found: {filename}") |