summaryrefslogtreecommitdiffstats
path: root/eos_downloader
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-10-10 09:54:17 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-10-10 09:54:17 +0000
commitb0554023303fe8656173d3dfbd0d2449145df38e (patch)
treedd749f47419b53ca3a5ec7f7bbe2590b8aafc041 /eos_downloader
parentReleasing debian version 0.8.2-1. (diff)
downloadeos-downloader-b0554023303fe8656173d3dfbd0d2449145df38e.tar.xz
eos-downloader-b0554023303fe8656173d3dfbd0d2449145df38e.zip
Merging upstream version 0.9.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'eos_downloader')
-rw-r--r--eos_downloader/__init__.py27
-rw-r--r--eos_downloader/cli/cli.py48
-rw-r--r--eos_downloader/cli/debug/commands.py41
-rw-r--r--eos_downloader/cli/get/commands.py211
-rw-r--r--eos_downloader/cli/info/commands.py86
-rw-r--r--eos_downloader/cli/utils.py38
-rw-r--r--eos_downloader/cvp.py85
-rw-r--r--eos_downloader/data.py92
-rw-r--r--eos_downloader/download.py33
-rw-r--r--eos_downloader/eos.py68
-rw-r--r--eos_downloader/models/version.py71
-rw-r--r--eos_downloader/object_downloader.py223
12 files changed, 658 insertions, 365 deletions
diff --git a/eos_downloader/__init__.py b/eos_downloader/__init__.py
index 345ccf7..4507a70 100644
--- a/eos_downloader/__init__.py
+++ b/eos_downloader/__init__.py
@@ -5,23 +5,31 @@
EOS Downloader module.
"""
-from __future__ import (absolute_import, division,
- print_function, unicode_literals, annotations)
+from __future__ import (
+ absolute_import,
+ annotations,
+ division,
+ print_function,
+ unicode_literals,
+)
+
import dataclasses
-from typing import Any
-import json
import importlib.metadata
+import json
+from typing import Any
-__author__ = '@titom73'
-__email__ = 'tom@inetsix.net'
-__date__ = '2022-03-16'
+__author__ = "@titom73"
+__email__ = "tom@inetsix.net"
+__date__ = "2022-03-16"
__version__ = importlib.metadata.version("eos-downloader")
# __all__ = ["CvpAuthenticationItem", "CvFeatureManager", "EOSDownloader", "ObjectDownloader", "reverse"]
ARISTA_GET_SESSION = "https://www.arista.com/custom_data/api/cvp/getSessionCode/"
-ARISTA_SOFTWARE_FOLDER_TREE = "https://www.arista.com/custom_data/api/cvp/getFolderTree/"
+ARISTA_SOFTWARE_FOLDER_TREE = (
+ "https://www.arista.com/custom_data/api/cvp/getFolderTree/"
+)
ARISTA_DOWNLOAD_URL = "https://www.arista.com/custom_data/api/cvp/getDownloadLink/"
@@ -36,11 +44,12 @@ check the Access Token. Then re-run the script with the correct token.
MSG_INVALID_DATA = """Invalid data returned by server
"""
-EVE_QEMU_FOLDER_PATH = '/opt/unetlab/addons/qemu/'
+EVE_QEMU_FOLDER_PATH = "/opt/unetlab/addons/qemu/"
class EnhancedJSONEncoder(json.JSONEncoder):
"""Custom JSon encoder."""
+
def default(self, o: Any) -> Any:
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
diff --git a/eos_downloader/cli/cli.py b/eos_downloader/cli/cli.py
index ddd0dea..ad77f2b 100644
--- a/eos_downloader/cli/cli.py
+++ b/eos_downloader/cli/cli.py
@@ -11,49 +11,51 @@ ARDL CLI Baseline.
"""
import click
-from rich.console import Console
-import eos_downloader
-from eos_downloader.cli.get import commands as get_commands
+
+from eos_downloader import __version__
from eos_downloader.cli.debug import commands as debug_commands
+from eos_downloader.cli.get import commands as get_commands
from eos_downloader.cli.info import commands as info_commands
+from eos_downloader.cli.utils import AliasedGroup
+
-@click.group()
+@click.group(cls=AliasedGroup)
+@click.version_option(__version__)
@click.pass_context
-@click.option('--token', show_envvar=True, default=None, help='Arista Token from your customer account')
+@click.option(
+ "--token",
+ show_envvar=True,
+ default=None,
+ help="Arista Token from your customer account",
+)
def ardl(ctx: click.Context, token: str) -> None:
"""Arista Network Download CLI"""
ctx.ensure_object(dict)
- ctx.obj['token'] = token
+ ctx.obj["token"] = token
-@click.command()
-def version() -> None:
- """Display version of ardl"""
- console = Console()
- console.print(f'ardl is running version {eos_downloader.__version__}')
-
-
-@ardl.group(no_args_is_help=True)
+@ardl.group(cls=AliasedGroup, no_args_is_help=True)
@click.pass_context
-def get(ctx: click.Context) -> None:
+def get(ctx: click.Context, cls: click.Group = AliasedGroup) -> None:
# pylint: disable=redefined-builtin
"""Download Arista from Arista website"""
-@ardl.group(no_args_is_help=True)
+@ardl.group(cls=AliasedGroup, no_args_is_help=True)
@click.pass_context
-def info(ctx: click.Context) -> None:
+def info(ctx: click.Context, cls: click.Group = AliasedGroup) -> None:
# pylint: disable=redefined-builtin
"""List information from Arista website"""
-@ardl.group(no_args_is_help=True)
+@ardl.group(cls=AliasedGroup, no_args_is_help=True)
@click.pass_context
-def debug(ctx: click.Context) -> None:
+def debug(ctx: click.Context, cls: click.Group = AliasedGroup) -> None:
# pylint: disable=redefined-builtin
"""Debug commands to work with ardl"""
+
# ANTA CLI Execution
@@ -64,13 +66,9 @@ def cli() -> None:
get.add_command(get_commands.cvp)
info.add_command(info_commands.eos_versions)
debug.add_command(debug_commands.xml)
- ardl.add_command(version)
# Load CLI
- ardl(
- obj={},
- auto_envvar_prefix='arista'
- )
+ ardl(obj={}, auto_envvar_prefix="arista")
-if __name__ == '__main__':
+if __name__ == "__main__":
cli()
diff --git a/eos_downloader/cli/debug/commands.py b/eos_downloader/cli/debug/commands.py
index 107b8a0..5a0d7f8 100644
--- a/eos_downloader/cli/debug/commands.py
+++ b/eos_downloader/cli/debug/commands.py
@@ -22,32 +22,51 @@ import eos_downloader.eos
@click.command()
@click.pass_context
-@click.option('--output', default=str('arista.xml'), help='Path to save XML file', type=click.Path(), show_default=True)
-@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False))
+@click.option(
+ "--output",
+ default=str("arista.xml"),
+ help="Path to save XML file",
+ type=click.Path(),
+ show_default=True,
+)
+@click.option(
+ "--log-level",
+ "--log",
+ help="Logging level of the command",
+ default=None,
+ type=click.Choice(
+ ["debug", "info", "warning", "error", "critical"], case_sensitive=False
+ ),
+)
def xml(ctx: click.Context, output: str, log_level: str) -> None:
# sourcery skip: remove-unnecessary-cast
"""Extract XML directory structure"""
console = Console()
# Get from Context
- token = ctx.obj['token']
+ token = ctx.obj["token"]
logger.remove()
if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
my_download = eos_downloader.eos.EOSDownloader(
- image='unset',
- software='EOS',
- version='unset',
+ image="unset",
+ software="EOS",
+ version="unset",
token=token,
- hash_method='sha512sum')
+ hash_method="sha512sum",
+ )
my_download.authenticate()
- xml_object: ET.ElementTree = my_download._get_folder_tree() # pylint: disable=protected-access
+ xml_object: ET.ElementTree = (
+ my_download.get_folder_tree()
+ ) # pylint: disable=protected-access
xml_content = xml_object.getroot()
- xmlstr = minidom.parseString(ET.tostring(xml_content)).toprettyxml(indent=" ", newl='')
- with open(output, "w", encoding='utf-8') as f:
+ xmlstr = minidom.parseString(ET.tostring(xml_content)).toprettyxml(
+ indent=" ", newl=""
+ )
+ with open(output, "w", encoding="utf-8") as f:
f.write(str(xmlstr))
- console.print(f'XML file saved in: { output }')
+ console.print(f"XML file saved in: { output }")
diff --git a/eos_downloader/cli/get/commands.py b/eos_downloader/cli/get/commands.py
index 13a8eec..b4525fe 100644
--- a/eos_downloader/cli/get/commands.py
+++ b/eos_downloader/cli/get/commands.py
@@ -21,68 +21,156 @@ from rich.console import Console
import eos_downloader.eos
from eos_downloader.models.version import BASE_VERSION_STR, RTYPE_FEATURE, RTYPES
-EOS_IMAGE_TYPE = ['64', 'INT', '2GB-INT', 'cEOS', 'cEOS64', 'vEOS', 'vEOS-lab', 'EOS-2GB', 'default']
-CVP_IMAGE_TYPE = ['ova', 'rpm', 'kvm', 'upgrade']
+EOS_IMAGE_TYPE = [
+ "64",
+ "INT",
+ "2GB-INT",
+ "cEOS",
+ "cEOS64",
+ "vEOS",
+ "vEOS-lab",
+ "EOS-2GB",
+ "default",
+]
+CVP_IMAGE_TYPE = ["ova", "rpm", "kvm", "upgrade"]
+
@click.command(no_args_is_help=True)
@click.pass_context
-@click.option('--image-type', default='default', help='EOS Image type', type=click.Choice(EOS_IMAGE_TYPE), required=True)
-@click.option('--version', default=None, help='EOS version', type=str, required=False)
-@click.option('--latest', '-l', is_flag=True, type=click.BOOL, default=False, help='Get latest version in given branch. If --branch is not use, get the latest branch with specific release type')
-@click.option('--release-type', '-rtype', type=click.Choice(RTYPES, case_sensitive=False), default=RTYPE_FEATURE, help='EOS release type to search')
-@click.option('--branch', '-b', type=click.STRING, default=None, help='EOS Branch to list releases')
-@click.option('--docker-name', default='arista/ceos', help='Docker image name (default: arista/ceos)', type=str, show_default=True)
-@click.option('--output', default=str(os.path.relpath(os.getcwd(), start=os.curdir)), help='Path to save image', type=click.Path(),show_default=True)
+@click.option(
+ "--image-type",
+ default="default",
+ help="EOS Image type",
+ type=click.Choice(EOS_IMAGE_TYPE),
+ required=True,
+)
+@click.option("--version", default=None, help="EOS version", type=str, required=False)
+@click.option(
+ "--latest",
+ "-l",
+ is_flag=True,
+ type=click.BOOL,
+ default=False,
+ help="Get latest version in given branch. If --branch is not use, get the latest branch with specific release type",
+)
+@click.option(
+ "--release-type",
+ "-rtype",
+ type=click.Choice(RTYPES, case_sensitive=False),
+ default=RTYPE_FEATURE,
+ help="EOS release type to search",
+)
+@click.option(
+ "--branch",
+ "-b",
+ type=click.STRING,
+ default=None,
+ help="EOS Branch to list releases",
+)
+@click.option(
+ "--docker-name",
+ default="arista/ceos",
+ help="Docker image name (default: arista/ceos)",
+ type=str,
+ show_default=True,
+)
+@click.option(
+ "--output",
+ default=str(os.path.relpath(os.getcwd(), start=os.curdir)),
+ help="Path to save image",
+ type=click.Path(),
+ show_default=True,
+)
# Debugging
-@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False))
+@click.option(
+ "--log-level",
+ "--log",
+ help="Logging level of the command",
+ default=None,
+ type=click.Choice(
+ ["debug", "info", "warning", "error", "critical"], case_sensitive=False
+ ),
+)
# Boolean triggers
-@click.option('--eve-ng', is_flag=True, help='Run EVE-NG vEOS provisioning (only if CLI runs on an EVE-NG server)', default=False)
-@click.option('--disable-ztp', is_flag=True, help='Disable ZTP process in vEOS image (only available with --eve-ng)', default=False)
-@click.option('--import-docker', is_flag=True, help='Import docker image (only available with --image_type cEOSlab)', default=False)
+@click.option(
+ "--eve-ng",
+ is_flag=True,
+ help="Run EVE-NG vEOS provisioning (only if CLI runs on an EVE-NG server)",
+ default=False,
+)
+@click.option(
+ "--disable-ztp",
+ is_flag=True,
+ help="Disable ZTP process in vEOS image (only available with --eve-ng)",
+ default=False,
+)
+@click.option(
+ "--import-docker",
+ is_flag=True,
+ help="Import docker image (only available with --image_type cEOSlab)",
+ default=False,
+)
def eos(
- ctx: click.Context, image_type: str, output: str, log_level: str, eve_ng: bool, disable_ztp: bool,
- import_docker: bool, docker_name: str, version: Union[str, None] = None, release_type: str = RTYPE_FEATURE,
- latest: bool = False, branch: Union[str,None] = None
- ) -> int:
+ ctx: click.Context,
+ image_type: str,
+ output: str,
+ log_level: str,
+ eve_ng: bool,
+ disable_ztp: bool,
+ import_docker: bool,
+ docker_name: str,
+ version: Union[str, None] = None,
+ release_type: str = RTYPE_FEATURE,
+ latest: bool = False,
+ branch: Union[str, None] = None,
+) -> int:
"""Download EOS image from Arista website"""
console = Console()
# Get from Context
- token = ctx.obj['token']
- if token is None or token == '':
- console.print('❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red")
+ token = ctx.obj["token"]
+ if token is None or token == "":
+ console.print(
+ "❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option",
+ style="bold red",
+ )
sys.exit(1)
logger.remove()
if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
- console.print("🪐 [bold blue]eos-downloader[/bold blue] is starting...", )
- console.print(f' - Image Type: {image_type}')
- console.print(f' - Version: {version}')
-
+ console.print(
+ "🪐 [bold blue]eos-downloader[/bold blue] is starting...",
+ )
+ console.print(f" - Image Type: {image_type}")
+ console.print(f" - Version: {version}")
if version is not None:
my_download = eos_downloader.eos.EOSDownloader(
image=image_type,
- software='EOS',
+ software="EOS",
version=version,
token=token,
- hash_method='sha512sum')
+ hash_method="sha512sum",
+ )
my_download.authenticate()
elif latest:
my_download = eos_downloader.eos.EOSDownloader(
image=image_type,
- software='EOS',
- version='unset',
+ software="EOS",
+ version="unset",
token=token,
- hash_method='sha512sum')
+ hash_method="sha512sum",
+ )
my_download.authenticate()
if branch is None:
branch = str(my_download.latest_branch(rtype=release_type).branch)
latest_version = my_download.latest_eos(branch, rtype=release_type)
if str(latest_version) == BASE_VERSION_STR:
- console.print(f'[red]Error[/red], cannot find any version in {branch} for {release_type} release type')
+ console.print(
+ f"[red]Error[/red], cannot find any version in {branch} for {release_type} release type"
+ )
sys.exit(1)
my_download.version = str(latest_version)
@@ -92,46 +180,71 @@ def eos(
my_download.download_local(file_path=output, checksum=True)
if import_docker:
- my_download.docker_import(
- image_name=docker_name
- )
- console.print('✅ processing done !')
+ my_download.docker_import(image_name=docker_name)
+ console.print("✅ processing done !")
sys.exit(0)
-
@click.command(no_args_is_help=True)
@click.pass_context
-@click.option('--format', default='upgrade', help='CVP Image type', type=click.Choice(CVP_IMAGE_TYPE), required=True)
-@click.option('--version', default=None, help='CVP version', type=str, required=True)
-@click.option('--output', default=str(os.path.relpath(os.getcwd(), start=os.curdir)), help='Path to save image', type=click.Path(),show_default=True)
-@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False))
-def cvp(ctx: click.Context, version: str, format: str, output: str, log_level: str) -> int:
+@click.option(
+ "--format",
+ default="upgrade",
+ help="CVP Image type",
+ type=click.Choice(CVP_IMAGE_TYPE),
+ required=True,
+)
+@click.option("--version", default=None, help="CVP version", type=str, required=True)
+@click.option(
+ "--output",
+ default=str(os.path.relpath(os.getcwd(), start=os.curdir)),
+ help="Path to save image",
+ type=click.Path(),
+ show_default=True,
+)
+@click.option(
+ "--log-level",
+ "--log",
+ help="Logging level of the command",
+ default=None,
+ type=click.Choice(
+ ["debug", "info", "warning", "error", "critical"], case_sensitive=False
+ ),
+)
+def cvp(
+ ctx: click.Context, version: str, format: str, output: str, log_level: str
+) -> int:
"""Download CVP image from Arista website"""
console = Console()
# Get from Context
- token = ctx.obj['token']
- if token is None or token == '':
- console.print('❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red")
+ token = ctx.obj["token"]
+ if token is None or token == "":
+ console.print(
+ "❗ Token is unset ! Please configure ARISTA_TOKEN or use --token option",
+ style="bold red",
+ )
sys.exit(1)
logger.remove()
if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
- console.print("🪐 [bold blue]eos-downloader[/bold blue] is starting...", )
- console.print(f' - Image Type: {format}')
- console.print(f' - Version: {version}')
+ console.print(
+ "🪐 [bold blue]eos-downloader[/bold blue] is starting...",
+ )
+ console.print(f" - Image Type: {format}")
+ console.print(f" - Version: {version}")
my_download = eos_downloader.eos.EOSDownloader(
image=format,
- software='CloudVision',
+ software="CloudVision",
version=version,
token=token,
- hash_method='md5sum')
+ hash_method="md5sum",
+ )
my_download.authenticate()
my_download.download_local(file_path=output, checksum=False)
- console.print('✅ processing done !')
+ console.print("✅ processing done !")
sys.exit(0)
diff --git a/eos_downloader/cli/info/commands.py b/eos_downloader/cli/info/commands.py
index b51003b..64097a1 100644
--- a/eos_downloader/cli/info/commands.py
+++ b/eos_downloader/cli/info/commands.py
@@ -24,12 +24,53 @@ from eos_downloader.models.version import BASE_VERSION_STR, RTYPE_FEATURE, RTYPE
@click.command(no_args_is_help=True)
@click.pass_context
-@click.option('--latest', '-l', is_flag=True, type=click.BOOL, default=False, help='Get latest version in given branch. If --branch is not use, get the latest branch with specific release type')
-@click.option('--release-type', '-rtype', type=click.Choice(RTYPES, case_sensitive=False), default=RTYPE_FEATURE, help='EOS release type to search')
-@click.option('--branch', '-b', type=click.STRING, default=None, help='EOS Branch to list releases')
-@click.option('--verbose', '-v', is_flag=True, type=click.BOOL, default=False, help='Human readable output. Default is none to use output in script)')
-@click.option('--log-level', '--log', help='Logging level of the command', default='warning', type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False))
-def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = None, release_type: str = RTYPE_FEATURE, latest: bool = False, verbose: bool = False) -> None:
+@click.option(
+ "--latest",
+ "-l",
+ is_flag=True,
+ type=click.BOOL,
+ default=False,
+ help="Get latest version in given branch. If --branch is not use, get the latest branch with specific release type",
+)
+@click.option(
+ "--release-type",
+ "-rtype",
+ type=click.Choice(RTYPES, case_sensitive=False),
+ default=RTYPE_FEATURE,
+ help="EOS release type to search",
+)
+@click.option(
+ "--branch",
+ "-b",
+ type=click.STRING,
+ default=None,
+ help="EOS Branch to list releases",
+)
+@click.option(
+ "--verbose",
+ "-v",
+ is_flag=True,
+ type=click.BOOL,
+ default=False,
+ help="Human readable output. Default is none to use output in script)",
+)
+@click.option(
+ "--log-level",
+ "--log",
+ help="Logging level of the command",
+ default="warning",
+ type=click.Choice(
+ ["debug", "info", "warning", "error", "critical"], case_sensitive=False
+ ),
+)
+def eos_versions(
+ ctx: click.Context,
+ log_level: str,
+ branch: Union[str, None] = None,
+ release_type: str = RTYPE_FEATURE,
+ latest: bool = False,
+ verbose: bool = False,
+) -> None:
# pylint: disable = too-many-branches
"""
List Available EOS version on Arista.com website.
@@ -42,22 +83,23 @@ def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = N
"""
console = Console()
# Get from Context
- token = ctx.obj['token']
+ token = ctx.obj["token"]
logger.remove()
if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
my_download = eos_downloader.eos.EOSDownloader(
- image='unset',
- software='EOS',
- version='unset',
+ image="unset",
+ software="EOS",
+ version="unset",
token=token,
- hash_method='sha512sum')
+ hash_method="sha512sum",
+ )
auth = my_download.authenticate()
if verbose and auth:
- console.print('✅ Authenticated on arista.com')
+ console.print("✅ Authenticated on arista.com")
if release_type is not None:
release_type = release_type.upper()
@@ -67,21 +109,27 @@ def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = N
branch = str(my_download.latest_branch(rtype=release_type).branch)
latest_version = my_download.latest_eos(branch, rtype=release_type)
if str(latest_version) == BASE_VERSION_STR:
- console.print(f'[red]Error[/red], cannot find any version in {branch} for {release_type} release type')
+ console.print(
+ f"[red]Error[/red], cannot find any version in {branch} for {release_type} release type"
+ )
sys.exit(1)
if verbose:
- console.print(f'Branch {branch} has been selected with release type {release_type}')
+ console.print(
+ f"Branch {branch} has been selected with release type {release_type}"
+ )
if branch is not None:
- console.print(f'Latest release for {branch}: {latest_version}')
+ console.print(f"Latest release for {branch}: {latest_version}")
else:
- console.print(f'Latest EOS release: {latest_version}')
+ console.print(f"Latest EOS release: {latest_version}")
else:
- console.print(f'{ latest_version }')
+ console.print(f"{ latest_version }")
else:
versions = my_download.get_eos_versions(branch=branch, rtype=release_type)
if verbose:
- console.print(f'List of available versions for {branch if branch is not None else "all branches"}')
+ console.print(
+ f'List of available versions for {branch if branch is not None else "all branches"}'
+ )
for version in versions:
- console.print(f' → {str(version)}')
+ console.print(f" → {str(version)}")
else:
pprint([str(version) for version in versions])
diff --git a/eos_downloader/cli/utils.py b/eos_downloader/cli/utils.py
new file mode 100644
index 0000000..4a14f53
--- /dev/null
+++ b/eos_downloader/cli/utils.py
@@ -0,0 +1,38 @@
+#!/usr/bin/python
+# coding: utf-8 -*-
+# pylint: disable=inconsistent-return-statements
+
+
+"""
+Extension for the python ``click`` module
+to provide a group or command with aliases.
+"""
+
+
+from typing import Any
+import click
+
+
+class AliasedGroup(click.Group):
+ """
+ Implements a subclass of Group that accepts a prefix for a command.
+ If there were a command called push, it would accept pus as an alias (so long as it was unique)
+ """
+ def get_command(self, ctx: click.Context, cmd_name: str) -> Any:
+ """Documentation to build"""
+ rv = click.Group.get_command(self, ctx, cmd_name)
+ if rv is not None:
+ return rv
+ matches = [x for x in self.list_commands(ctx)
+ if x.startswith(cmd_name)]
+ if not matches:
+ return None
+ if len(matches) == 1:
+ return click.Group.get_command(self, ctx, matches[0])
+ ctx.fail(f"Too many matches: {', '.join(sorted(matches))}")
+
+ def resolve_command(self, ctx: click.Context, args: Any) -> Any:
+ """Documentation to build"""
+ # always return the full command name
+ _, cmd, args = super().resolve_command(ctx, args)
+ return cmd.name, cmd, args
diff --git a/eos_downloader/cvp.py b/eos_downloader/cvp.py
index 6f14eb0..678daa8 100644
--- a/eos_downloader/cvp.py
+++ b/eos_downloader/cvp.py
@@ -6,11 +6,12 @@ CVP Uploader content
"""
import os
-from typing import List, Optional, Any
from dataclasses import dataclass
-from loguru import logger
+from typing import Any, List, Optional
+
from cvprac.cvp_client import CvpClient
from cvprac.cvp_client_errors import CvpLoginError
+from loguru import logger
# from eos_downloader.tools import exc_to_str
@@ -20,8 +21,9 @@ from cvprac.cvp_client_errors import CvpLoginError
@dataclass
class CvpAuthenticationItem:
"""
- Data structure to represent Cloudvision Authentication
+ Data structure to represent Cloudvision Authentication
"""
+
server: str
port: int = 443
token: Optional[str] = None
@@ -29,15 +31,16 @@ class CvpAuthenticationItem:
validate_cert: bool = False
-class Filer():
+class Filer:
# pylint: disable=too-few-public-methods
"""
Filer Helper for file management
"""
+
def __init__(self, path: str) -> None:
self.file_exist = False
- self.filename = ''
- self.absolute_path = ''
+ self.filename = ""
+ self.absolute_path = ""
self.relative_path = path
if os.path.exists(path):
self.file_exist = True
@@ -45,13 +48,14 @@ class Filer():
self.absolute_path = os.path.realpath(path)
def __repr__(self) -> str:
- return self.absolute_path if self.file_exist else ''
+ return self.absolute_path if self.file_exist else ""
-class CvFeatureManager():
+class CvFeatureManager:
"""
CvFeatureManager Object to interect with Cloudvision
"""
+
def __init__(self, authentication: CvpAuthenticationItem) -> None:
"""
__init__ Class Creator
@@ -86,19 +90,21 @@ class CvFeatureManager():
try:
client.connect(
nodes=[authentication.server],
- username='',
- password='',
+ username="",
+ password="",
api_token=authentication.token,
is_cvaas=True,
port=authentication.port,
cert=authentication.validate_cert,
- request_timeout=authentication.timeout
+ request_timeout=authentication.timeout,
)
except CvpLoginError as error_data:
- logger.error(f'Cannot connect to Cloudvision server {authentication.server}')
- logger.debug(f'Error message: {error_data}')
- logger.info('connected to Cloudvision server')
- logger.debug(f'Connection info: {authentication}')
+ logger.error(
+ f"Cannot connect to Cloudvision server {authentication.server}"
+ )
+ logger.debug(f"Error message: {error_data}")
+ logger.info("connected to Cloudvision server")
+ logger.debug(f"Connection info: {authentication}")
return client
def __get_images(self) -> List[Any]:
@@ -111,8 +117,8 @@ class CvFeatureManager():
Fact returned by Cloudvision
"""
images = []
- logger.debug(' -> Collecting images')
- images = self._cv_instance.api.get_images()['data']
+ logger.debug(" -> Collecting images")
+ images = self._cv_instance.api.get_images()["data"]
return images if self.__check_api_result(images) else []
# def __get_bundles(self):
@@ -161,7 +167,11 @@ class CvFeatureManager():
bool
True if present
"""
- return any(image_name == image['name'] for image in self._cv_images) if isinstance(self._cv_images, list) else False
+ return (
+ any(image_name == image["name"] for image in self._cv_images)
+ if isinstance(self._cv_images, list)
+ else False
+ )
def _does_bundle_exist(self, bundle_name: str) -> bool:
# pylint: disable=unused-argument
@@ -192,19 +202,23 @@ class CvFeatureManager():
"""
image_item = Filer(path=image_path)
if image_item.file_exist is False:
- logger.error(f'File not found: {image_item.relative_path}')
+ logger.error(f"File not found: {image_item.relative_path}")
return False
- logger.info(f'File path for image: {image_item}')
+ logger.info(f"File path for image: {image_item}")
if self._does_image_exist(image_name=image_item.filename):
- logger.error("Image found in Cloudvision , Please delete it before running this script")
+ logger.error(
+ "Image found in Cloudvision , Please delete it before running this script"
+ )
return False
try:
- upload_result = self._cv_instance.api.add_image(filepath=image_item.absolute_path)
+ upload_result = self._cv_instance.api.add_image(
+ filepath=image_item.absolute_path
+ )
except Exception as e: # pylint: disable=broad-exception-caught
- logger.error('An error occurred during upload, check CV connection')
- logger.error(f'Exception message is: {e}')
+ logger.error("An error occurred during upload, check CV connection")
+ logger.error(f"Exception message is: {e}")
return False
- logger.debug(f'Upload Result is : {upload_result}')
+ logger.debug(f"Upload Result is : {upload_result}")
return True
def build_image_list(self, image_list: List[str]) -> List[Any]:
@@ -252,25 +266,30 @@ class CvFeatureManager():
bool
True if succeeds
"""
- logger.debug(f'Init creation of an image bundle {name} with following images {images_name}')
+ logger.debug(
+ f"Init creation of an image bundle {name} with following images {images_name}"
+ )
all_images_present: List[bool] = []
self._cv_images = self.__get_images()
all_images_present.extend(
- self._does_image_exist(image_name=image_name)
- for image_name in images_name
+ self._does_image_exist(image_name=image_name) for image_name in images_name
)
# Bundle Create
if self._does_bundle_exist(bundle_name=name) is False:
- logger.debug(f'Creating image bundle {name} with following images {images_name}')
+ logger.debug(
+ f"Creating image bundle {name} with following images {images_name}"
+ )
images_data = self.build_image_list(image_list=images_name)
if images_data is not None:
- logger.debug('Images information: {images_data}')
+ logger.debug("Images information: {images_data}")
try:
- data = self._cv_instance.api.save_image_bundle(name=name, images=images_data)
+ data = self._cv_instance.api.save_image_bundle(
+ name=name, images=images_data
+ )
except Exception as e: # pylint: disable=broad-exception-caught
- logger.critical(f'{e}')
+ logger.critical(f"{e}")
else:
logger.debug(data)
return True
- logger.critical('No data found for images')
+ logger.critical("No data found for images")
return False
diff --git a/eos_downloader/data.py b/eos_downloader/data.py
index 74f2f8e..ba54b3b 100644
--- a/eos_downloader/data.py
+++ b/eos_downloader/data.py
@@ -12,82 +12,22 @@ Data are built from content of Arista XML file
# [platform][image][version]
DATA_MAPPING = {
"CloudVision": {
- "ova": {
- "extension": ".ova",
- "prepend": "cvp",
- "folder_level": 0
- },
- "rpm": {
- "extension": "",
- "prepend": "cvp-rpm-installer",
- "folder_level": 0
- },
- "kvm": {
- "extension": "-kvm.tgz",
- "prepend": "cvp",
- "folder_level": 0
- },
- "upgrade": {
- "extension": ".tgz",
- "prepend": "cvp-upgrade",
- "folder_level": 0
- },
+ "ova": {"extension": ".ova", "prepend": "cvp", "folder_level": 0},
+ "rpm": {"extension": "", "prepend": "cvp-rpm-installer", "folder_level": 0},
+ "kvm": {"extension": "-kvm.tgz", "prepend": "cvp", "folder_level": 0},
+ "upgrade": {"extension": ".tgz", "prepend": "cvp-upgrade", "folder_level": 0},
},
"EOS": {
- "64": {
- "extension": ".swi",
- "prepend": "EOS64",
- "folder_level": 0
- },
- "INT": {
- "extension": "-INT.swi",
- "prepend": "EOS",
- "folder_level": 1
- },
- "2GB-INT": {
- "extension": "-INT.swi",
- "prepend": "EOS-2GB",
- "folder_level": 1
- },
- "cEOS": {
- "extension": ".tar.xz",
- "prepend": "cEOS-lab",
- "folder_level": 0
- },
- "cEOS64": {
- "extension": ".tar.xz",
- "prepend": "cEOS64-lab",
- "folder_level": 0
- },
- "vEOS": {
- "extension": ".vmdk",
- "prepend": "vEOS",
- "folder_level": 0
- },
- "vEOS-lab": {
- "extension": ".vmdk",
- "prepend": "vEOS-lab",
- "folder_level": 0
- },
- "EOS-2GB": {
- "extension": ".swi",
- "prepend": "EOS-2GB",
- "folder_level": 0
- },
- "RN": {
- "extension": "-",
- "prepend": "RN",
- "folder_level": 0
- },
- "SOURCE": {
- "extension": "-source.tar",
- "prepend": "EOS",
- "folder_level": 0
- },
- "default": {
- "extension": ".swi",
- "prepend": "EOS",
- "folder_level": 0
- }
- }
+ "64": {"extension": ".swi", "prepend": "EOS64", "folder_level": 0},
+ "INT": {"extension": "-INT.swi", "prepend": "EOS", "folder_level": 1},
+ "2GB-INT": {"extension": "-INT.swi", "prepend": "EOS-2GB", "folder_level": 1},
+ "cEOS": {"extension": ".tar.xz", "prepend": "cEOS-lab", "folder_level": 0},
+ "cEOS64": {"extension": ".tar.xz", "prepend": "cEOS64-lab", "folder_level": 0},
+ "vEOS": {"extension": ".vmdk", "prepend": "vEOS", "folder_level": 0},
+ "vEOS-lab": {"extension": ".vmdk", "prepend": "vEOS-lab", "folder_level": 0},
+ "EOS-2GB": {"extension": ".swi", "prepend": "EOS-2GB", "folder_level": 0},
+ "RN": {"extension": "-", "prepend": "RN", "folder_level": 0},
+ "SOURCE": {"extension": "-source.tar", "prepend": "EOS", "folder_level": 0},
+ "default": {"extension": ".swi", "prepend": "EOS", "folder_level": 0},
+ },
}
diff --git a/eos_downloader/download.py b/eos_downloader/download.py
index 2297b04..2c9576e 100644
--- a/eos_downloader/download.py
+++ b/eos_downloader/download.py
@@ -8,13 +8,20 @@ import os.path
import signal
from concurrent.futures import ThreadPoolExecutor
from threading import Event
-from typing import Iterable, Any
+from typing import Any, Iterable
import requests
import rich
from rich import console
-from rich.progress import (BarColumn, DownloadColumn, Progress, TaskID,
- TextColumn, TimeElapsedColumn, TransferSpeedColumn)
+from rich.progress import (
+ BarColumn,
+ DownloadColumn,
+ Progress,
+ TaskID,
+ TextColumn,
+ TimeElapsedColumn,
+ TransferSpeedColumn,
+)
console = rich.get_console()
done_event = Event()
@@ -28,7 +35,7 @@ def handle_sigint(signum: Any, frame: Any) -> None:
signal.signal(signal.SIGINT, handle_sigint)
-class DownloadProgressBar():
+class DownloadProgressBar:
"""
Object to manage Download process with Progress Bar from Rich
"""
@@ -38,7 +45,9 @@ class DownloadProgressBar():
Class Constructor
"""
self.progress = Progress(
- TextColumn("💾 Downloading [bold blue]{task.fields[filename]}", justify="right"),
+ TextColumn(
+ "💾 Downloading [bold blue]{task.fields[filename]}", justify="right"
+ ),
BarColumn(bar_width=None),
"[progress.percentage]{task.percentage:>3.1f}%",
"•",
@@ -48,14 +57,16 @@ class DownloadProgressBar():
"•",
TimeElapsedColumn(),
"•",
- console=console
+ console=console,
)
- def _copy_url(self, task_id: TaskID, url: str, path: str, block_size: int = 1024) -> bool:
+ def _copy_url(
+ self, task_id: TaskID, url: str, path: str, block_size: int = 1024
+ ) -> bool:
"""Copy data from a url to a local file."""
response = requests.get(url, stream=True, timeout=5)
# This will break if the response doesn't contain content length
- self.progress.update(task_id, total=int(response.headers['Content-Length']))
+ self.progress.update(task_id, total=int(response.headers["Content-Length"]))
with open(path, "wb") as dest_file:
self.progress.start_task(task_id)
for data in response.iter_content(chunk_size=block_size):
@@ -71,7 +82,9 @@ class DownloadProgressBar():
with self.progress:
with ThreadPoolExecutor(max_workers=4) as pool:
for url in urls:
- filename = url.split("/")[-1].split('?')[0]
+ filename = url.split("/")[-1].split("?")[0]
dest_path = os.path.join(dest_dir, filename)
- task_id = self.progress.add_task("download", filename=filename, start=False)
+ task_id = self.progress.add_task(
+ "download", filename=filename, start=False
+ )
pool.submit(self._copy_url, task_id, url, dest_path)
diff --git a/eos_downloader/eos.py b/eos_downloader/eos.py
index e5f3670..716992f 100644
--- a/eos_downloader/eos.py
+++ b/eos_downloader/eos.py
@@ -14,13 +14,20 @@ import rich
from loguru import logger
from rich import console
-from eos_downloader.models.version import BASE_BRANCH_STR, BASE_VERSION_STR, REGEX_EOS_VERSION, RTYPE_FEATURE, EosVersion
+from eos_downloader.models.version import (
+ BASE_BRANCH_STR,
+ BASE_VERSION_STR,
+ REGEX_EOS_VERSION,
+ RTYPE_FEATURE,
+ EosVersion,
+)
from eos_downloader.object_downloader import ObjectDownloader
# logger = logging.getLogger(__name__)
console = rich.get_console()
+
class EOSDownloader(ObjectDownloader):
"""
EOSDownloader Object to download EOS images from Arista.com website
@@ -47,22 +54,27 @@ class EOSDownloader(ObjectDownloader):
file_path : str
Path where EOS image is located
"""
- logger.info('Mounting volume to disable ZTP')
- console.print('🚀 Mounting volume to disable ZTP')
+ logger.info("Mounting volume to disable ZTP")
+ console.print("🚀 Mounting volume to disable ZTP")
raw_folder = os.path.join(file_path, "raw")
os.system(f"rm -rf {raw_folder}")
os.system(f"mkdir -p {raw_folder}")
os.system(
- f'guestmount -a {os.path.join(file_path, "hda.qcow2")} -m /dev/sda2 {os.path.join(file_path, "raw")}')
- ztp_file = os.path.join(file_path, 'raw/zerotouch-config')
- with open(ztp_file, 'w', encoding='ascii') as zfile:
- zfile.write('DISABLE=True')
- logger.info(f'Unmounting volume in {file_path}')
+ f'guestmount -a {os.path.join(file_path, "hda.qcow2")} -m /dev/sda2 {os.path.join(file_path, "raw")}'
+ )
+ ztp_file = os.path.join(file_path, "raw/zerotouch-config")
+ with open(ztp_file, "w", encoding="ascii") as zfile:
+ zfile.write("DISABLE=True")
+ logger.info(f"Unmounting volume in {file_path}")
os.system(f"guestunmount {os.path.join(file_path, 'raw')}")
os.system(f"rm -rf {os.path.join(file_path, 'raw')}")
logger.info(f"Volume has been successfully unmounted at {file_path}")
- def _parse_xml_for_version(self,root_xml: ET.ElementTree, xpath: str = './/dir[@label="Active Releases"]/dir/dir/[@label]') -> List[EosVersion]:
+ def _parse_xml_for_version(
+ self,
+ root_xml: ET.ElementTree,
+ xpath: str = './/dir[@label="Active Releases"]/dir/dir/[@label]',
+ ) -> List[EosVersion]:
"""
Extract list of available EOS versions from Arista.com website
@@ -77,19 +89,21 @@ class EOSDownloader(ObjectDownloader):
"""
# XPATH: .//dir[@label="Active Releases"]/dir/dir/[@label]
if self.eos_versions is None:
- logger.debug(f'Using xpath {xpath}')
+ logger.debug(f"Using xpath {xpath}")
eos_versions = []
for node in root_xml.findall(xpath):
- if 'label' in node.attrib and node.get('label') is not None:
- label = node.get('label')
+ if "label" in node.attrib and node.get("label") is not None:
+ label = node.get("label")
if label is not None and REGEX_EOS_VERSION.match(label):
eos_version = EosVersion.from_str(label)
eos_versions.append(eos_version)
logger.debug(f"Found {label} - {eos_version}")
- logger.debug(f'List of versions found on arista.com is: {eos_versions}')
+ logger.debug(f"List of versions found on arista.com is: {eos_versions}")
self.eos_versions = eos_versions
else:
- logger.debug('receiving instruction to download versions, but already available')
+ logger.debug(
+ "receiving instruction to download versions, but already available"
+ )
return self.eos_versions
def _get_branches(self, with_rtype: str = RTYPE_FEATURE) -> List[str]:
@@ -104,9 +118,11 @@ class EOSDownloader(ObjectDownloader):
Returns:
List[str]: A lsit of string that represent all availables EOS branches
"""
- root = self._get_folder_tree()
+ root = self.get_folder_tree()
versions = self._parse_xml_for_version(root_xml=root)
- return list({version.branch for version in versions if version.rtype == with_rtype})
+ return list(
+ {version.branch for version in versions if version.rtype == with_rtype}
+ )
def latest_branch(self, rtype: str = RTYPE_FEATURE) -> EosVersion:
"""
@@ -125,7 +141,9 @@ class EOSDownloader(ObjectDownloader):
selected_branch = branch
return selected_branch
- def get_eos_versions(self, branch: Union[str,None] = None, rtype: Union[str,None] = None) -> List[EosVersion]:
+ def get_eos_versions(
+ self, branch: Union[str, None] = None, rtype: Union[str, None] = None
+ ) -> List[EosVersion]:
"""
Get a list of available EOS version available on arista.com
@@ -139,16 +157,22 @@ class EOSDownloader(ObjectDownloader):
Returns:
List[EosVersion]: A list of versions available
"""
- root = self._get_folder_tree()
+ root = self.get_folder_tree()
result = []
for version in self._parse_xml_for_version(root_xml=root):
if branch is None and (version.rtype == rtype or rtype is None):
result.append(version)
- elif branch is not None and version.is_in_branch(branch) and version.rtype == rtype:
+ elif (
+ branch is not None
+ and version.is_in_branch(branch)
+ and version.rtype == rtype
+ ):
result.append(version)
return result
- def latest_eos(self, branch: Union[str,None] = None, rtype: str = RTYPE_FEATURE) -> EosVersion:
+ def latest_eos(
+ self, branch: Union[str, None] = None, rtype: str = RTYPE_FEATURE
+ ) -> EosVersion:
"""
Get latest version of EOS
@@ -168,7 +192,9 @@ class EOSDownloader(ObjectDownloader):
latest_branch = self.latest_branch(rtype=rtype)
else:
latest_branch = EosVersion.from_str(branch)
- for version in self.get_eos_versions(branch=str(latest_branch.branch), rtype=rtype):
+ for version in self.get_eos_versions(
+ branch=str(latest_branch.branch), rtype=rtype
+ ):
if version > selected_version:
if rtype is not None and version.rtype == rtype:
selected_version = version
diff --git a/eos_downloader/models/version.py b/eos_downloader/models/version.py
index 4b051a5..22de100 100644
--- a/eos_downloader/models/version.py
+++ b/eos_downloader/models/version.py
@@ -16,11 +16,11 @@ from eos_downloader.tools import exc_to_str
# logger = logging.getLogger(__name__)
-BASE_VERSION_STR = '4.0.0F'
-BASE_BRANCH_STR = '4.0'
+BASE_VERSION_STR = "4.0.0F"
+BASE_BRANCH_STR = "4.0"
-RTYPE_FEATURE = 'F'
-RTYPE_MAINTENANCE = 'M'
+RTYPE_FEATURE = "F"
+RTYPE_MAINTENANCE = "M"
RTYPES = [RTYPE_FEATURE, RTYPE_MAINTENANCE]
# Regular Expression to capture multiple EOS version format
@@ -29,8 +29,12 @@ RTYPES = [RTYPE_FEATURE, RTYPE_MAINTENANCE]
# 4.21.1M
# 4.28.10.F
# 4.28.6.1M
-REGEX_EOS_VERSION = re.compile(r"^.*(?P<major>4)\.(?P<minor>\d{1,2})\.(?P<patch>\d{1,2})(?P<other>\.\d*)*(?P<rtype>[M,F])*$")
-REGEX_EOS_BRANCH = re.compile(r"^.*(?P<major>4)\.(?P<minor>\d{1,2})(\.?P<patch>\d)*(\.\d)*(?P<rtype>[M,F])*$")
+REGEX_EOS_VERSION = re.compile(
+ r"^.*(?P<major>4)\.(?P<minor>\d{1,2})\.(?P<patch>\d{1,2})(?P<other>\.\d*)*(?P<rtype>[M,F])*$"
+)
+REGEX_EOS_BRANCH = re.compile(
+ r"^.*(?P<major>4)\.(?P<minor>\d{1,2})(\.?P<patch>\d)*(\.\d)*(?P<rtype>[M,F])*$"
+)
class EosVersion(BaseModel):
@@ -59,10 +63,11 @@ class EosVersion(BaseModel):
Args:
BaseModel (Pydantic): Pydantic Base Model
"""
+
major: int = 4
minor: int = 0
patch: int = 0
- rtype: Optional[str] = 'F'
+ rtype: Optional[str] = "F"
other: Any = None
@classmethod
@@ -84,7 +89,7 @@ class EosVersion(BaseModel):
Returns:
EosVersion object
"""
- logger.debug(f'receiving version: {eos_version}')
+ logger.debug(f"receiving version: {eos_version}")
if REGEX_EOS_VERSION.match(eos_version):
matches = REGEX_EOS_VERSION.match(eos_version)
# assert matches is not None
@@ -95,7 +100,7 @@ class EosVersion(BaseModel):
# assert matches is not None
assert matches is not None
return cls(**matches.groupdict())
- logger.error(f'Error occured with {eos_version}')
+ logger.error(f"Error occured with {eos_version}")
return EosVersion()
@property
@@ -106,7 +111,7 @@ class EosVersion(BaseModel):
Returns:
str: branch from version
"""
- return f'{self.major}.{self.minor}'
+ return f"{self.major}.{self.minor}"
def __str__(self) -> str:
"""
@@ -118,8 +123,8 @@ class EosVersion(BaseModel):
str: A standard EOS version string representing <MAJOR>.<MINOR>.<PATCH><RTYPE>
"""
if self.other is None:
- return f'{self.major}.{self.minor}.{self.patch}{self.rtype}'
- return f'{self.major}.{self.minor}.{self.patch}{self.other}{self.rtype}'
+ return f"{self.major}.{self.minor}.{self.patch}{self.rtype}"
+ return f"{self.major}.{self.minor}.{self.patch}{self.other}{self.rtype}"
def _compare(self, other: EosVersion) -> float:
"""
@@ -141,58 +146,68 @@ class EosVersion(BaseModel):
float: -1 if ver1 < ver2, 0 if ver1 == ver2, 1 if ver1 > ver2
"""
if not isinstance(other, EosVersion):
- raise ValueError(f'could not compare {other} as it is not an EosVersion object')
+ raise ValueError(
+ f"could not compare {other} as it is not an EosVersion object"
+ )
comparison_flag: float = 0
- logger.warning(f'current version {self.__str__()} - other {str(other)}') # pylint: disable = unnecessary-dunder-call
+ logger.warning(
+ f"current version {self.__str__()} - other {str(other)}" # pylint: disable = unnecessary-dunder-call
+ )
for key, _ in self.dict().items():
- if comparison_flag == 0 and self.dict()[key] is None or other.dict()[key] is None:
- logger.debug(f'{key}: local None - remote None')
- logger.debug(f'{key}: local {self.dict()} - remote {other.dict()}')
+ if (
+ comparison_flag == 0
+ and self.dict()[key] is None
+ or other.dict()[key] is None
+ ):
+ logger.debug(f"{key}: local None - remote None")
+ logger.debug(f"{key}: local {self.dict()} - remote {other.dict()}")
return comparison_flag
- logger.debug(f'{key}: local {self.dict()[key]} - remote {other.dict()[key]}')
+ logger.debug(
+ f"{key}: local {self.dict()[key]} - remote {other.dict()[key]}"
+ )
if comparison_flag == 0 and self.dict()[key] < other.dict()[key]:
comparison_flag = -1
if comparison_flag == 0 and self.dict()[key] > other.dict()[key]:
comparison_flag = 1
if comparison_flag != 0:
- logger.info(f'comparison result is {comparison_flag}')
+ logger.info(f"comparison result is {comparison_flag}")
return comparison_flag
- logger.info(f'comparison result is {comparison_flag}')
+ logger.info(f"comparison result is {comparison_flag}")
return comparison_flag
@typing.no_type_check
def __eq__(self, other):
- """ Implement __eq__ function (==) """
+ """Implement __eq__ function (==)"""
return self._compare(other) == 0
@typing.no_type_check
def __ne__(self, other):
# type: ignore
- """ Implement __nw__ function (!=) """
+ """Implement __nw__ function (!=)"""
return self._compare(other) != 0
@typing.no_type_check
def __lt__(self, other):
# type: ignore
- """ Implement __lt__ function (<) """
+ """Implement __lt__ function (<)"""
return self._compare(other) < 0
@typing.no_type_check
def __le__(self, other):
# type: ignore
- """ Implement __le__ function (<=) """
+ """Implement __le__ function (<=)"""
return self._compare(other) <= 0
@typing.no_type_check
def __gt__(self, other):
# type: ignore
- """ Implement __gt__ function (>) """
+ """Implement __gt__ function (>)"""
return self._compare(other) > 0
@typing.no_type_check
def __ge__(self, other):
# type: ignore
- """ Implement __ge__ function (>=) """
+ """Implement __ge__ function (>=)"""
return self._compare(other) >= 0
def match(self, match_expr: str) -> bool:
@@ -236,7 +251,7 @@ class EosVersion(BaseModel):
"['<', '>', '==', '<=', '>=', '!=']. "
f"You provided: {match_expr}"
)
- logger.debug(f'work on comparison {prefix} with base release {match_version}')
+ logger.debug(f"work on comparison {prefix} with base release {match_version}")
possibilities_dict = {
">": (1,),
"<": (-1,),
@@ -263,7 +278,7 @@ class EosVersion(BaseModel):
bool: True if current version is in provided branch, otherwise False
"""
try:
- logger.debug(f'reading branch str:{branch_str}')
+ logger.debug(f"reading branch str:{branch_str}")
branch = EosVersion.from_str(branch_str)
except Exception as error: # pylint: disable = broad-exception-caught
logger.error(exc_to_str(error))
diff --git a/eos_downloader/object_downloader.py b/eos_downloader/object_downloader.py
index 0420acb..d7b1418 100644
--- a/eos_downloader/object_downloader.py
+++ b/eos_downloader/object_downloader.py
@@ -8,8 +8,13 @@
eos_downloader class definition
"""
-from __future__ import (absolute_import, division, print_function,
- unicode_literals, annotations)
+from __future__ import (
+ absolute_import,
+ annotations,
+ division,
+ print_function,
+ unicode_literals,
+)
import base64
import glob
@@ -26,9 +31,14 @@ from loguru import logger
from rich import console
from tqdm import tqdm
-from eos_downloader import (ARISTA_DOWNLOAD_URL, ARISTA_GET_SESSION,
- ARISTA_SOFTWARE_FOLDER_TREE, EVE_QEMU_FOLDER_PATH,
- MSG_INVALID_DATA, MSG_TOKEN_EXPIRED)
+from eos_downloader import (
+ ARISTA_DOWNLOAD_URL,
+ ARISTA_GET_SESSION,
+ ARISTA_SOFTWARE_FOLDER_TREE,
+ EVE_QEMU_FOLDER_PATH,
+ MSG_INVALID_DATA,
+ MSG_TOKEN_EXPIRED,
+)
from eos_downloader.data import DATA_MAPPING
from eos_downloader.download import DownloadProgressBar
@@ -37,11 +47,19 @@ from eos_downloader.download import DownloadProgressBar
console = rich.get_console()
-class ObjectDownloader():
+class ObjectDownloader:
"""
ObjectDownloader Generic Object to download from Arista.com
"""
- def __init__(self, image: str, version: str, token: str, software: str = 'EOS', hash_method: str = 'md5sum'):
+
+ def __init__(
+ self,
+ image: str,
+ version: str,
+ token: str,
+ software: str = "EOS",
+ hash_method: str = "md5sum",
+ ):
"""
__init__ Class constructor
@@ -70,10 +88,10 @@ class ObjectDownloader():
self.hash_method = hash_method
self.timeout = 5
# Logging
- logger.debug(f'Filename built by _build_filename is {self.filename}')
+ logger.debug(f"Filename built by _build_filename is {self.filename}")
def __str__(self) -> str:
- return f'{self.software} - {self.image} - {self.version}'
+ return f"{self.software} - {self.image} - {self.version}"
# def __repr__(self):
# return str(self.__dict__)
@@ -102,16 +120,18 @@ class ObjectDownloader():
str:
Filename to search for on Arista.com
"""
- logger.info('start build')
+ logger.info("start build")
if self.software in DATA_MAPPING:
- logger.info(f'software in data mapping: {self.software}')
+ logger.info(f"software in data mapping: {self.software}")
if self.image in DATA_MAPPING[self.software]:
- logger.info(f'image in data mapping: {self.image}')
+ logger.info(f"image in data mapping: {self.image}")
return f"{DATA_MAPPING[self.software][self.image]['prepend']}-{self.version}{DATA_MAPPING[self.software][self.image]['extension']}"
return f"{DATA_MAPPING[self.software]['default']['prepend']}-{self.version}{DATA_MAPPING[self.software]['default']['extension']}"
- raise ValueError(f'Incorrect value for software {self.software}')
+ raise ValueError(f"Incorrect value for software {self.software}")
- def _parse_xml_for_path(self, root_xml: ET.ElementTree, xpath: str, search_file: str) -> str:
+ def _parse_xml_for_path(
+ self, root_xml: ET.ElementTree, xpath: str, search_file: str
+ ) -> str:
# sourcery skip: remove-unnecessary-cast
"""
_parse_xml Read and extract data from XML using XPATH
@@ -132,18 +152,18 @@ class ObjectDownloader():
str
File Path on Arista server side
"""
- logger.debug(f'Using xpath {xpath}')
- logger.debug(f'Search for file {search_file}')
- console.print(f'🔎 Searching file {search_file}')
+ logger.debug(f"Using xpath {xpath}")
+ logger.debug(f"Search for file {search_file}")
+ console.print(f"🔎 Searching file {search_file}")
for node in root_xml.findall(xpath):
# logger.debug('Found {}', node.text)
if str(node.text).lower() == search_file.lower():
- path = node.get('path')
- console.print(f' -> Found file at {path}')
+ path = node.get("path")
+ console.print(f" -> Found file at {path}")
logger.info(f'Found {node.text} at {node.get("path")}')
- return str(node.get('path')) if node.get('path') is not None else ''
- logger.error(f'Requested file ({self.filename}) not found !')
- return ''
+ return str(node.get("path")) if node.get("path") is not None else ""
+ logger.error(f"Requested file ({self.filename}) not found !")
+ return ""
def _get_hash(self, file_path: str) -> str:
"""
@@ -165,10 +185,10 @@ class ObjectDownloader():
dl_rich_progress_bar = DownloadProgressBar()
dl_rich_progress_bar.download(urls=[hash_url], dest_dir=file_path)
hash_downloaded = f"{file_path}/{os.path.basename(remote_hash_file)}"
- hash_content = 'unset'
- with open(hash_downloaded, 'r', encoding='utf-8') as f:
+ hash_content = "unset"
+ with open(hash_downloaded, "r", encoding="utf-8") as f:
hash_content = f.read()
- return hash_content.split(' ')[0]
+ return hash_content.split(" ")[0]
@staticmethod
def _compute_hash_md5sum(file: str, hash_expected: str) -> bool:
@@ -195,7 +215,9 @@ class ObjectDownloader():
hash_md5.update(chunk)
if hash_md5.hexdigest() == hash_expected:
return True
- logger.warning(f'Downloaded file is corrupt: local md5 ({hash_md5.hexdigest()}) is different to md5 from arista ({hash_expected})')
+ logger.warning(
+ f"Downloaded file is corrupt: local md5 ({hash_md5.hexdigest()}) is different to md5 from arista ({hash_expected})"
+ )
return False
@staticmethod
@@ -223,10 +245,12 @@ class ObjectDownloader():
hash_sha512.update(chunk)
if hash_sha512.hexdigest() == hash_expected:
return True
- logger.warning(f'Downloaded file is corrupt: local sha512 ({hash_sha512.hexdigest()}) is different to sha512 from arista ({hash_expected})')
+ logger.warning(
+ f"Downloaded file is corrupt: local sha512 ({hash_sha512.hexdigest()}) is different to sha512 from arista ({hash_expected})"
+ )
return False
- def _get_folder_tree(self) -> ET.ElementTree:
+ def get_folder_tree(self) -> ET.ElementTree:
"""
_get_folder_tree Download XML tree from Arista server
@@ -237,15 +261,17 @@ class ObjectDownloader():
"""
if self.session_id is None:
self.authenticate()
- jsonpost = {'sessionCode': self.session_id}
- result = requests.post(ARISTA_SOFTWARE_FOLDER_TREE, data=json.dumps(jsonpost), timeout=self.timeout)
+ jsonpost = {"sessionCode": self.session_id}
+ result = requests.post(
+ ARISTA_SOFTWARE_FOLDER_TREE, data=json.dumps(jsonpost), timeout=self.timeout
+ )
try:
folder_tree = result.json()["data"]["xml"]
return ET.ElementTree(ET.fromstring(folder_tree))
except KeyError as error:
logger.error(MSG_INVALID_DATA)
- logger.error(f'Server returned: {error}')
- console.print(f'❌ {MSG_INVALID_DATA}', style="bold red")
+ logger.error(f"Server returned: {error}")
+ console.print(f"❌ {MSG_INVALID_DATA}", style="bold red")
sys.exit(1)
def _get_remote_filepath(self) -> str:
@@ -259,12 +285,14 @@ class ObjectDownloader():
str
Remote path of the file to download
"""
- root = self._get_folder_tree()
+ root = self.get_folder_tree()
logger.debug("GET XML content from ARISTA.com")
xpath = f'.//dir[@label="{self.software}"]//file'
- return self._parse_xml_for_path(root_xml=root, xpath=xpath, search_file=self.filename)
+ return self._parse_xml_for_path(
+ root_xml=root, xpath=xpath, search_file=self.filename
+ )
- def _get_remote_hashpath(self, hash_method: str = 'md5sum') -> str:
+ def _get_remote_hashpath(self, hash_method: str = "md5sum") -> str:
"""
_get_remote_hashpath Helper to get path of the hash's file to download
@@ -275,16 +303,16 @@ class ObjectDownloader():
str
Remote path of the hash's file to download
"""
- root = self._get_folder_tree()
+ root = self.get_folder_tree()
logger.debug("GET XML content from ARISTA.com")
xpath = f'.//dir[@label="{self.software}"]//file'
return self._parse_xml_for_path(
root_xml=root,
xpath=xpath,
- search_file=f'{self.filename}.{hash_method}',
+ search_file=f"{self.filename}.{hash_method}",
)
- def _get_url(self, remote_file_path: str) -> str:
+ def _get_url(self, remote_file_path: str) -> str:
"""
_get_url Get URL to use for downloading file from Arista server
@@ -302,13 +330,15 @@ class ObjectDownloader():
"""
if self.session_id is None:
self.authenticate()
- jsonpost = {'sessionCode': self.session_id, 'filePath': remote_file_path}
- result = requests.post(ARISTA_DOWNLOAD_URL, data=json.dumps(jsonpost), timeout=self.timeout)
- if 'data' in result.json() and 'url' in result.json()['data']:
+ jsonpost = {"sessionCode": self.session_id, "filePath": remote_file_path}
+ result = requests.post(
+ ARISTA_DOWNLOAD_URL, data=json.dumps(jsonpost), timeout=self.timeout
+ )
+ if "data" in result.json() and "url" in result.json()["data"]:
# logger.debug('URL to download file is: {}', result.json())
return result.json()["data"]["url"]
- logger.critical(f'Server returns following message: {result.json()}')
- return ''
+ logger.critical(f"Server returns following message: {result.json()}")
+ return ""
@staticmethod
def _download_file_raw(url: str, file_path: str) -> str:
@@ -331,31 +361,40 @@ class ObjectDownloader():
"""
chunkSize = 1024
r = requests.get(url, stream=True, timeout=5)
- with open(file_path, 'wb') as f:
- pbar = tqdm(unit="B", total=int(r.headers['Content-Length']), unit_scale=True, unit_divisor=1024)
+ with open(file_path, "wb") as f:
+ pbar = tqdm(
+ unit="B",
+ total=int(r.headers["Content-Length"]),
+ unit_scale=True,
+ unit_divisor=1024,
+ )
for chunk in r.iter_content(chunk_size=chunkSize):
if chunk:
pbar.update(len(chunk))
f.write(chunk)
return file_path
- def _download_file(self, file_path: str, filename: str, rich_interface: bool = True) -> Union[None, str]:
+ def _download_file(
+ self, file_path: str, filename: str, rich_interface: bool = True
+ ) -> Union[None, str]:
remote_file_path = self._get_remote_filepath()
- logger.info(f'File found on arista server: {remote_file_path}')
+ logger.info(f"File found on arista server: {remote_file_path}")
file_url = self._get_url(remote_file_path=remote_file_path)
if file_url is not False:
if not rich_interface:
- return self._download_file_raw(url=file_url, file_path=os.path.join(file_path, filename))
+ return self._download_file_raw(
+ url=file_url, file_path=os.path.join(file_path, filename)
+ )
rich_downloader = DownloadProgressBar()
rich_downloader.download(urls=[file_url], dest_dir=file_path)
return os.path.join(file_path, filename)
- logger.error(f'Cannot download file {file_path}')
+ logger.error(f"Cannot download file {file_path}")
return None
@staticmethod
def _create_destination_folder(path: str) -> None:
# os.makedirs(path, mode, exist_ok=True)
- os.system(f'mkdir -p {path}')
+ os.system(f"mkdir -p {path}")
@staticmethod
def _disable_ztp(file_path: str) -> None:
@@ -379,24 +418,29 @@ class ObjectDownloader():
"""
credentials = (base64.b64encode(self.token.encode())).decode("utf-8")
session_code_url = ARISTA_GET_SESSION
- jsonpost = {'accessToken': credentials}
+ jsonpost = {"accessToken": credentials}
- result = requests.post(session_code_url, data=json.dumps(jsonpost), timeout=self.timeout)
+ result = requests.post(
+ session_code_url, data=json.dumps(jsonpost), timeout=self.timeout
+ )
- if result.json()["status"]["message"] in[ 'Access token expired', 'Invalid access token']:
- console.print(f'❌ {MSG_TOKEN_EXPIRED}', style="bold red")
+ if result.json()["status"]["message"] in [
+ "Access token expired",
+ "Invalid access token",
+ ]:
+ console.print(f"❌ {MSG_TOKEN_EXPIRED}", style="bold red")
logger.error(MSG_TOKEN_EXPIRED)
return False
try:
- if 'data' in result.json():
+ if "data" in result.json():
self.session_id = result.json()["data"]["session_code"]
- logger.info('Authenticated on arista.com')
+ logger.info("Authenticated on arista.com")
return True
- logger.debug(f'{result.json()}')
+ logger.debug(f"{result.json()}")
return False
except KeyError as error_arista:
- logger.error(f'Error: {error_arista}')
+ logger.error(f"Error: {error_arista}")
sys.exit(1)
def download_local(self, file_path: str, checksum: bool = False) -> bool:
@@ -422,25 +466,33 @@ class ObjectDownloader():
bool
True if everything went well, False if any problem appears
"""
- file_downloaded = str(self._download_file(file_path=file_path, filename=self.filename))
+ file_downloaded = str(
+ self._download_file(file_path=file_path, filename=self.filename)
+ )
# Check file HASH
hash_result = False
if checksum:
- logger.info('🚀 Running checksum validation')
- console.print('🚀 Running checksum validation')
- if self.hash_method == 'md5sum':
+ logger.info("🚀 Running checksum validation")
+ console.print("🚀 Running checksum validation")
+ if self.hash_method == "md5sum":
hash_expected = self._get_hash(file_path=file_path)
- hash_result = self._compute_hash_md5sum(file=file_downloaded, hash_expected=hash_expected)
- elif self.hash_method == 'sha512sum':
+ hash_result = self._compute_hash_md5sum(
+ file=file_downloaded, hash_expected=hash_expected
+ )
+ elif self.hash_method == "sha512sum":
hash_expected = self._get_hash(file_path=file_path)
- hash_result = self._compute_hash_sh512sum(file=file_downloaded, hash_expected=hash_expected)
+ hash_result = self._compute_hash_sh512sum(
+ file=file_downloaded, hash_expected=hash_expected
+ )
if not hash_result:
- logger.error('Downloaded file is corrupted, please check your connection')
- console.print('❌ Downloaded file is corrupted, please check your connection')
+ logger.error("Downloaded file is corrupted, please check your connection")
+ console.print(
+ "❌ Downloaded file is corrupted, please check your connection"
+ )
return False
- logger.info('Downloaded file is correct.')
- console.print('✅ Downloaded file is correct.')
+ logger.info("Downloaded file is correct.")
+ console.print("✅ Downloaded file is correct.")
return True
def provision_eve(self, noztp: bool = False, checksum: bool = True) -> None:
@@ -466,7 +518,7 @@ class ObjectDownloader():
# Build image name to use in folder path
eos_image_name = self.filename.rstrip(".vmdk").lower()
if noztp:
- eos_image_name = f'{eos_image_name}-noztp'
+ eos_image_name = f"{eos_image_name}-noztp"
# Create full path for EVE-NG
file_path = os.path.join(EVE_QEMU_FOLDER_PATH, eos_image_name.rstrip())
# Create folders in filesystem
@@ -474,20 +526,23 @@ class ObjectDownloader():
# Download file to local destination
file_downloaded = self._download_file(
- file_path=file_path, filename=self.filename)
+ file_path=file_path, filename=self.filename
+ )
# Convert to QCOW2 format
file_qcow2 = os.path.join(file_path, "hda.qcow2")
- logger.info('Converting VMDK to QCOW2 format')
- console.print('🚀 Converting VMDK to QCOW2 format...')
+ logger.info("Converting VMDK to QCOW2 format")
+ console.print("🚀 Converting VMDK to QCOW2 format...")
- os.system(f'$(which qemu-img) convert -f vmdk -O qcow2 {file_downloaded} {file_qcow2}')
+ os.system(
+ f"$(which qemu-img) convert -f vmdk -O qcow2 {file_downloaded} {file_qcow2}"
+ )
- logger.info('Applying unl_wrapper to fix permissions')
- console.print('Applying unl_wrapper to fix permissions')
+ logger.info("Applying unl_wrapper to fix permissions")
+ console.print("Applying unl_wrapper to fix permissions")
- os.system('/opt/unetlab/wrappers/unl_wrapper -a fixpermissions')
- os.system(f'rm -f {file_downloaded}')
+ os.system("/opt/unetlab/wrappers/unl_wrapper -a fixpermissions")
+ os.system(f"rm -f {file_downloaded}")
if noztp:
self._disable_ztp(file_path=file_path)
@@ -502,12 +557,12 @@ class ObjectDownloader():
version (str):
image_name (str, optional): Image name to use. Defaults to "arista/ceos".
"""
- docker_image = f'{image_name}:{self.version}'
- logger.info(f'Importing image {self.filename} to {docker_image}')
- console.print(f'🚀 Importing image {self.filename} to {docker_image}')
- os.system(f'$(which docker) import {self.filename} {docker_image}')
- for filename in glob.glob(f'{self.filename}*'):
+ docker_image = f"{image_name}:{self.version}"
+ logger.info(f"Importing image {self.filename} to {docker_image}")
+ console.print(f"🚀 Importing image {self.filename} to {docker_image}")
+ os.system(f"$(which docker) import {self.filename} {docker_image}")
+ for filename in glob.glob(f"{self.filename}*"):
try:
os.remove(filename)
except FileNotFoundError:
- console.print(f'File not found: {filename}')
+ console.print(f"File not found: {filename}")