From 26cac87f1cb90abc2fac2144a39ad88b3b70031d Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Tue, 10 Oct 2023 11:54:13 +0200 Subject: Adding upstream version 0.9.0. Signed-off-by: Daniel Baumann --- .github/workflows/on_demand.yml | 20 +-- .github/workflows/pr-management.yml | 22 +++- .github/workflows/release.yml | 24 ++-- .pre-commit-config.yaml | 64 ++++++++++ README.md | 31 ++++- bin/README.md | 111 ----------------- bin/cvp-upload | 56 --------- bin/eos-download | 86 ------------- eos_downloader/__init__.py | 27 ++-- eos_downloader/cli/cli.py | 48 ++++---- eos_downloader/cli/debug/commands.py | 41 +++++-- eos_downloader/cli/get/commands.py | 211 ++++++++++++++++++++++++-------- eos_downloader/cli/info/commands.py | 86 ++++++++++--- eos_downloader/cli/utils.py | 38 ++++++ eos_downloader/cvp.py | 85 ++++++++----- eos_downloader/data.py | 92 +++----------- eos_downloader/download.py | 33 +++-- eos_downloader/eos.py | 68 +++++++---- eos_downloader/models/version.py | 71 ++++++----- eos_downloader/object_downloader.py | 223 +++++++++++++++++++++------------- pylintrc | 3 +- pyproject.toml | 8 +- tests/lib/dataset.py | 143 +++++++++++----------- tests/lib/fixtures.py | 35 ++++-- tests/lib/helpers.py | 13 +- tests/system/test_eos_download.py.old | 1 - tests/unit/test_eos_version.py | 152 ++++++++++++++--------- tests/unit/test_object_downloader.py | 128 +++++++++++-------- 28 files changed, 1067 insertions(+), 853 deletions(-) create mode 100644 .pre-commit-config.yaml delete mode 100644 bin/README.md delete mode 100755 bin/cvp-upload delete mode 100755 bin/eos-download create mode 100644 eos_downloader/cli/utils.py diff --git a/.github/workflows/on_demand.yml b/.github/workflows/on_demand.yml index 1e41be2..17223f9 100644 --- a/.github/workflows/on_demand.yml +++ b/.github/workflows/on_demand.yml @@ -14,11 +14,11 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Docker meta for TAG id: meta - uses: docker/metadata-action@v4 + uses: docker/metadata-action@v5 with: images: | ${{ secrets.DOCKER_IMAGE }} @@ -27,20 +27,20 @@ jobs: type=raw,value=${{ inputs.tag }} - name: Login to DockerHub - uses: docker/login-action@v2 + uses: docker/login-action@v3 with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKER_PASSWORD }} - name: Login to GitHub Container Registry - uses: docker/login-action@v2 + uses: docker/login-action@v3 with: registry: ghcr.io username: ${{ github.repository_owner }} password: ${{ secrets.GITHUB_TOKEN }} - name: Build and push - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: . file: Dockerfile @@ -54,11 +54,11 @@ jobs: needs: [docker] steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Docker meta for TAG id: meta - uses: docker/metadata-action@v4 + uses: docker/metadata-action@v5 with: images: | ${{ secrets.DOCKER_IMAGE }} @@ -67,20 +67,20 @@ jobs: type=raw,value=${{ inputs.tag }}-dind - name: Login to DockerHub - uses: docker/login-action@v2 + uses: docker/login-action@v3 with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKER_PASSWORD }} - name: Login to GitHub Container Registry - uses: docker/login-action@v2 + uses: docker/login-action@v3 with: registry: ghcr.io username: ${{ github.repository_owner }} password: ${{ secrets.GITHUB_TOKEN }} - name: Build and push - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: . file: Dockerfile.docker diff --git a/.github/workflows/pr-management.yml b/.github/workflows/pr-management.yml index 233166e..dc30ccf 100644 --- a/.github/workflows/pr-management.yml +++ b/.github/workflows/pr-management.yml @@ -8,15 +8,29 @@ on: types: [assigned, opened, synchronize, reopened] jobs: + + pre-commit: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.8", "3.9", "3.10"] + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - uses: pre-commit-ci/lite-action@v1.0.1 + compiling: name: Run installation process and code compilation supported Python versions runs-on: ubuntu-latest + needs: [pre-commit] strategy: matrix: python-version: ["3.8", "3.9", "3.10"] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v4 @@ -44,7 +58,7 @@ jobs: python: ["3.8", "3.9", "3.10"] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Setup Python uses: actions/setup-python@v4 @@ -67,7 +81,7 @@ jobs: python: ["3.8", "3.9", "3.10"] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Setup Python uses: actions/setup-python@v4 @@ -90,7 +104,7 @@ jobs: python: ["3.8", "3.9", "3.10"] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Setup Python uses: actions/setup-python@v4 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 622a85d..e148e6b 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -11,7 +11,7 @@ jobs: # runs-on: ubuntu-latest # steps: # - name: Checkout code - # uses: actions/checkout@v3 + # uses: actions/checkout@v4 # with: # fetch-depth: 0 @@ -36,7 +36,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Install dependencies run: | @@ -59,11 +59,11 @@ jobs: needs: [pypi] steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Docker meta for TAG id: meta - uses: docker/metadata-action@v4 + uses: docker/metadata-action@v5 with: images: | ${{ secrets.DOCKER_IMAGE }} @@ -73,20 +73,20 @@ jobs: type=raw,value=latest - name: Login to DockerHub - uses: docker/login-action@v2 + uses: docker/login-action@v3 with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKER_PASSWORD }} - name: Login to GitHub Container Registry - uses: docker/login-action@v2 + uses: docker/login-action@v3 with: registry: ghcr.io username: ${{ github.repository_owner }} password: ${{ secrets.GITHUB_TOKEN }} - name: Build and push - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: . file: Dockerfile @@ -100,11 +100,11 @@ jobs: needs: [docker] steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Docker meta for TAG id: meta - uses: docker/metadata-action@v4 + uses: docker/metadata-action@v5 with: images: | ${{ secrets.DOCKER_IMAGE }} @@ -114,20 +114,20 @@ jobs: type=raw,value=latest-dind - name: Login to DockerHub - uses: docker/login-action@v2 + uses: docker/login-action@v3 with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKER_PASSWORD }} - name: Login to GitHub Container Registry - uses: docker/login-action@v2 + uses: docker/login-action@v3 with: registry: ghcr.io username: ${{ github.repository_owner }} password: ${{ secrets.GITHUB_TOKEN }} - name: Build and push - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: context: . file: Dockerfile.docker diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..513a2cc --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,64 @@ +--- +# See https://pre-commit.com for more information +# See https://pre-commit.com/hooks.html for more hooks +files: ^(eos_downloader)/ + +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.4.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-added-large-files + - id: check-merge-conflict + + # - repo: https://github.com/pycqa/isort + # rev: 5.12.0 + # hooks: + # - id: isort + # name: Check for changes when running isort on all python files + + - repo: https://github.com/psf/black + rev: 23.7.0 + hooks: + - id: black + name: Check for changes when running Black on all python files + + - repo: https://github.com/pycqa/flake8 + rev: 6.0.0 + hooks: + - id: flake8 + name: Check for PEP8 error on Python files + args: + - --config=/dev/null + - --max-line-length=165 + + - repo: local # as per https://pylint.pycqa.org/en/latest/user_guide/installation/pre-commit-integration.html + hooks: + - id: pylint + entry: pylint + language: python + name: Check for Linting error on Python files + description: This hook runs pylint. + types: [python] + args: + - -rn # Only display messages + - -sn # Don't display the score + - --rcfile=pylintrc # Link to config file + + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.4.1 + hooks: + - id: mypy + args: + - --config-file=pyproject.toml + additional_dependencies: + - "click==8.1.3" + - "click-help-colors==0.9.1" + - "pydantic~=2.0" + - "PyYAML==6.0" + - "requests>=2.27" + - "rich~=13.4" + - types-paramiko + - types-requests + files: eos_downloader diff --git a/README.md b/README.md index b5abbc3..0809624 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,13 @@ - [![code-testing](https://github.com/titom73/eos-downloader/actions/workflows/pr-management.yml/badge.svg?event=push)](https://github.com/titom73/eos-downloader/actions/workflows/pr-management.yml) ![PyPI - Python Version](https://img.shields.io/pypi/pyversions/eos-downloader) ![GitHub release (latest SemVer)](https://img.shields.io/github/v/release/titom73/arista-downloader) ![PyPI - Downloads/month](https://img.shields.io/pypi/dm/eos-downloader) +[![tests](https://github.com/titom73/eos-downloader/actions/workflows/pr-management.yml/badge.svg?event=push)](https://github.com/titom73/eos-downloader/actions/workflows/pr-management.yml) +![PyPI - Python Version](https://img.shields.io/pypi/pyversions/eos-downloader) +[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) +![Checked with mypy](http://www.mypy-lang.org/static/mypy_badge.svg) +![GitHub release](https://img.shields.io/github/v/release/titom73/arista-downloader) +![PyPI - Downloads/month](https://img.shields.io/pypi/dm/eos-downloader) + + # Arista Software Downloader @@ -19,6 +28,7 @@ Usage: ardl [OPTIONS] COMMAND [ARGS]... Arista Network Download CLI Options: + --version Show the version and exit. --token TEXT Arista Token from your customer account [env var: ARISTA_TOKEN] --help Show this message and exit. @@ -26,7 +36,6 @@ Options: Commands: debug Debug commands to work with ardl get Download Arista from Arista website - version Display version of ardl ``` > **Warning** @@ -35,10 +44,22 @@ Commands: ### Download EOS Package - +> **Note** > Supported packages are: EOS, cEOS, vEOS-lab, cEOS64 -You can download EOS packages with following commands: +CLI gives an option to get latest version available. By default it takes latest `F` release + +```bash +ardl get eos --image-type cEOS --latest +``` + +If you want to get latest M release, you can use `--release-type`: + +```bash +ardl get eos --image-type cEOS --release-type M --latest +``` + +You can download a specific EOS packages with following commands: ```bash # Example for a cEOS package @@ -164,7 +185,7 @@ tqdm On EVE-NG, you may have to install/upgrade __pyOpenSSL__ in version `23.0.0`: -``` +```bash # Error when running ardl: AttributeError: module 'lib' has no attribute 'X509_V_FLAG_CB_ISSUER_CHECK' $ pip install pyopenssl --upgrade diff --git a/bin/README.md b/bin/README.md deleted file mode 100644 index 7509633..0000000 --- a/bin/README.md +++ /dev/null @@ -1,111 +0,0 @@ -## scripts - -These scripts are deprecated and will be removed in a futur version. Please prefer the use of the CLI implemented in the package. - -### eos-download - -```bash -usage: eos-download [-h] - --version VERSION - [--token TOKEN] - [--image IMAGE] - [--destination DESTINATION] - [--eve] - [--noztp] - [--import_docker] - [--docker_name DOCKER_NAME] - [--verbose VERBOSE] - [--log] - -EOS downloader script. - -optional arguments: - -h, --help show this help message and exit - --token TOKEN arista.com user API key - can use ENV:ARISTA_TOKEN - --image IMAGE Type of EOS image required - --version VERSION EOS version to download from website - --destination DESTINATION - Path where to save EOS package downloaded - --eve Option to install EOS package to EVE-NG - --noztp Option to deactivate ZTP when used with EVE-NG - --import_docker Option to import cEOS image to docker - --docker_name DOCKER_NAME - Docker image name to use - --verbose VERBOSE Script verbosity - --log Option to activate logging to eos-downloader.log file -``` - -- Token are read from `ENV:ARISTA_TOKEN` unless you specify a specific token with CLI. - -- Supported platforms: - - - `INT`: International version - - `64`: 64 bits version - - `2GB` for 2GB flash platform - - `2GB-INT`: for 2GB running International - - `vEOS`: Virtual EOS image - - `vEOS-lab`: Virtual Lab EOS - - `vEOS64-lab`: Virtual Lab EOS running 64B - - `cEOS`: Docker version of EOS - - `cEOS64`: Docker version of EOS running in 64 bits - -#### Examples - -- Download vEOS-lab image and install in EVE-NG - -```bash -$ eos-download --image vEOS-lab --version 4.25.7M --eve --noztp -``` - -- Download Docker image - -```bash -$ eos-download --image cEOS --version 4.27.1F -šŸŖ eos-downloader is starting... - - Image Type: cEOS - - Version: 4.27.2F -āœ… Authenticated on arista.com -šŸ”Ž Searching file cEOS-lab-4.27.2F.tar.xz - -> Found file at /support/download/EOS-USA/Active Releases/4.27/EOS-4.27.2F/cEOS-lab/cEOS-lab-4.27.2F.tar.xz -šŸ’¾ Downloading cEOS-lab-4.27.2F.tar.xz ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā” 100.0% ā€¢ 17.1 MB/s ā€¢ 451.6/451.6 MB ā€¢ 0:00:19 ā€¢ -šŸš€ Running checksum validation -šŸ”Ž Searching file cEOS-lab-4.27.2F.tar.xz.sha512sum - -> Found file at /support/download/EOS-USA/Active -Releases/4.27/EOS-4.27.2F/cEOS-lab/cEOS-lab-4.27.2F.tar.xz.sha512sum -šŸ’¾ Downloading cEOS-lab-4.27.2F.tar.xz.sha512sum ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā” 100.0% ā€¢ ? ā€¢ 154/154 bytes ā€¢ 0:00:00 ā€¢ -āœ… Downloaded file is correct. -``` - -__Note:__ `ARISTA_TOKEN` should be set in your .profile and not set for each command. If not set, you can use `--token` knob. - -```bash -# Export Token -export ARISTA_TOKEN="xxxxxxx" -``` - -### Cloudvision Image uploader - -Create an image bundle on Cloudvision. - -```bash -cvp-upload -h -usage: cvp-upload [-h] - [--token TOKEN] - [--image IMAGE] - --cloudvision CLOUDVISION - [--create_bundle] - [--timeout TIMEOUT] - [--verbose VERBOSE] - -Cloudvision Image uploader script. - -optional arguments: - -h, --help show this help message and exit - --token TOKEN CVP Authentication token - can use ENV:ARISTA_AVD_CV_TOKEN - --image IMAGE Type of EOS image required - --cloudvision CLOUDVISION - Cloudvision instance where to upload image - --create_bundle Option to create image bundle with new uploaded image - --timeout TIMEOUT Timeout connection. Default is set to 1200sec - --verbose VERBOSE Script verbosity -``` \ No newline at end of file diff --git a/bin/cvp-upload b/bin/cvp-upload deleted file mode 100755 index 74213fe..0000000 --- a/bin/cvp-upload +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/python - -import sys -import os -import argparse -from eos_downloader.cvp import CvFeatureManager, CvpAuthenticationItem -from loguru import logger - -ARISTA_AVD_CV_TOKEN = os.getenv('ARISTA_AVD_CV_TOKEN', '') - - -def read_cli(): - parser = argparse.ArgumentParser(description='Cloudvision Image uploader script.') - parser.add_argument('--token', required=False, - default=ARISTA_AVD_CV_TOKEN, - help='CVP Authentication token - can use ENV:ARISTA_AVD_CV_TOKEN') - parser.add_argument('--image', required=False, - default='EOS', help='Type of EOS image required') - parser.add_argument('--cloudvision', required=True, - help='Cloudvision instance where to upload image') - parser.add_argument('--create_bundle', required=False, action='store_true', - help="Option to create image bundle with new uploaded image") - parser.add_argument('--timeout', required=False, - default=1200, - help='Timeout connection. Default is set to 1200sec') - parser.add_argument('--verbose', required=False, - default='info', help='Script verbosity') - return parser.parse_args() - - -if __name__ == '__main__': - - cli_options = read_cli() - - logger.remove() - logger.add(sys.stderr, level=str(cli_options.verbose).upper()) - - cv_authentication = CvpAuthenticationItem( - server=cli_options.cloudvision, - token=cli_options.token, - port=443, - timeout=cli_options.timeout, - validate_cert=False - ) - - my_cvp_uploader = CvFeatureManager(authentication=cv_authentication) - result_upload = my_cvp_uploader.upload_image(cli_options.image) - if result_upload and cli_options.create_bundle: - bundle_name = os.path.basename(cli_options.image) - logger.info('Creating image bundle {}'.format(bundle_name)) - my_cvp_uploader.create_bundle( - name=bundle_name, - images_name=[bundle_name] - ) - - sys.exit(0) diff --git a/bin/eos-download b/bin/eos-download deleted file mode 100755 index 9826b31..0000000 --- a/bin/eos-download +++ /dev/null @@ -1,86 +0,0 @@ -#!/usr/bin/python - -import sys -import os -import argparse -import eos_downloader.eos -from loguru import logger -from rich.console import Console - -ARISTA_TOKEN = os.getenv('ARISTA_TOKEN', '') - - -def read_cli(): - parser = argparse.ArgumentParser(description='EOS downloader script.') - parser.add_argument('--token', required=False, - default=ARISTA_TOKEN, - help='arista.com user API key - can use ENV:ARISTA_TOKEN') - parser.add_argument('--image', required=False, - default='EOS', help='Type of EOS image required') - parser.add_argument('--version', required=True, - default='', help='EOS version to download from website') - - parser.add_argument('--destination', required=False, - default=str(os.getcwd()), - help='Path where to save EOS package downloaded') - - parser.add_argument('--eve', required=False, action='store_true', - help="Option to install EOS package to EVE-NG") - parser.add_argument('--noztp', required=False, action='store_true', - help="Option to deactivate ZTP when used with EVE-NG") - - parser.add_argument('--import_docker', required=False, action='store_true', - help="Option to import cEOS image to docker") - parser.add_argument('--docker_name', required=False, - default='arista/ceos', - help='Docker image name to use') - - parser.add_argument('--verbose', required=False, - default='info', help='Script verbosity') - parser.add_argument('--log', required=False, action='store_true', - help="Option to activate logging to eos-downloader.log file") - - return parser.parse_args() - - -if __name__ == '__main__': - - cli_options = read_cli() - - console = Console() - - console.print('\n[red]WARNING: This script is now deprecated. Please use ardl cli instead[/red]\n\n') - - if cli_options.token is None or cli_options.token == '': - console.print('\nā— Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red") - sys.exit(1) - - logger.remove() - if cli_options.log: - logger.add("eos-downloader.log", rotation="10 MB", level=str(cli_options.verbose).upper()) - - console.print("šŸŖ [bold blue]eos-downloader[/bold blue] is starting...", ) - console.print(f' - Image Type: {cli_options.image}') - console.print(f' - Version: {cli_options.version}') - - - my_download = eos_downloader.eos.EOSDownloader( - image=cli_options.image, - software='EOS', - version=cli_options.version, - token=cli_options.token, - hash_method='sha512sum') - - my_download.authenticate() - - if cli_options.eve: - my_download.provision_eve(noztp=cli_options.noztp, checksum=True) - else: - my_download.download_local(file_path=cli_options.destination, checksum=True) - - if cli_options.import_docker: - my_download.docker_import( - image_name=cli_options.docker_name - ) - console.print('āœ… processing done !') - sys.exit(0) diff --git a/eos_downloader/__init__.py b/eos_downloader/__init__.py index 345ccf7..4507a70 100644 --- a/eos_downloader/__init__.py +++ b/eos_downloader/__init__.py @@ -5,23 +5,31 @@ EOS Downloader module. """ -from __future__ import (absolute_import, division, - print_function, unicode_literals, annotations) +from __future__ import ( + absolute_import, + annotations, + division, + print_function, + unicode_literals, +) + import dataclasses -from typing import Any -import json import importlib.metadata +import json +from typing import Any -__author__ = '@titom73' -__email__ = 'tom@inetsix.net' -__date__ = '2022-03-16' +__author__ = "@titom73" +__email__ = "tom@inetsix.net" +__date__ = "2022-03-16" __version__ = importlib.metadata.version("eos-downloader") # __all__ = ["CvpAuthenticationItem", "CvFeatureManager", "EOSDownloader", "ObjectDownloader", "reverse"] ARISTA_GET_SESSION = "https://www.arista.com/custom_data/api/cvp/getSessionCode/" -ARISTA_SOFTWARE_FOLDER_TREE = "https://www.arista.com/custom_data/api/cvp/getFolderTree/" +ARISTA_SOFTWARE_FOLDER_TREE = ( + "https://www.arista.com/custom_data/api/cvp/getFolderTree/" +) ARISTA_DOWNLOAD_URL = "https://www.arista.com/custom_data/api/cvp/getDownloadLink/" @@ -36,11 +44,12 @@ check the Access Token. Then re-run the script with the correct token. MSG_INVALID_DATA = """Invalid data returned by server """ -EVE_QEMU_FOLDER_PATH = '/opt/unetlab/addons/qemu/' +EVE_QEMU_FOLDER_PATH = "/opt/unetlab/addons/qemu/" class EnhancedJSONEncoder(json.JSONEncoder): """Custom JSon encoder.""" + def default(self, o: Any) -> Any: if dataclasses.is_dataclass(o): return dataclasses.asdict(o) diff --git a/eos_downloader/cli/cli.py b/eos_downloader/cli/cli.py index ddd0dea..ad77f2b 100644 --- a/eos_downloader/cli/cli.py +++ b/eos_downloader/cli/cli.py @@ -11,49 +11,51 @@ ARDL CLI Baseline. """ import click -from rich.console import Console -import eos_downloader -from eos_downloader.cli.get import commands as get_commands + +from eos_downloader import __version__ from eos_downloader.cli.debug import commands as debug_commands +from eos_downloader.cli.get import commands as get_commands from eos_downloader.cli.info import commands as info_commands +from eos_downloader.cli.utils import AliasedGroup + -@click.group() +@click.group(cls=AliasedGroup) +@click.version_option(__version__) @click.pass_context -@click.option('--token', show_envvar=True, default=None, help='Arista Token from your customer account') +@click.option( + "--token", + show_envvar=True, + default=None, + help="Arista Token from your customer account", +) def ardl(ctx: click.Context, token: str) -> None: """Arista Network Download CLI""" ctx.ensure_object(dict) - ctx.obj['token'] = token + ctx.obj["token"] = token -@click.command() -def version() -> None: - """Display version of ardl""" - console = Console() - console.print(f'ardl is running version {eos_downloader.__version__}') - - -@ardl.group(no_args_is_help=True) +@ardl.group(cls=AliasedGroup, no_args_is_help=True) @click.pass_context -def get(ctx: click.Context) -> None: +def get(ctx: click.Context, cls: click.Group = AliasedGroup) -> None: # pylint: disable=redefined-builtin """Download Arista from Arista website""" -@ardl.group(no_args_is_help=True) +@ardl.group(cls=AliasedGroup, no_args_is_help=True) @click.pass_context -def info(ctx: click.Context) -> None: +def info(ctx: click.Context, cls: click.Group = AliasedGroup) -> None: # pylint: disable=redefined-builtin """List information from Arista website""" -@ardl.group(no_args_is_help=True) +@ardl.group(cls=AliasedGroup, no_args_is_help=True) @click.pass_context -def debug(ctx: click.Context) -> None: +def debug(ctx: click.Context, cls: click.Group = AliasedGroup) -> None: # pylint: disable=redefined-builtin """Debug commands to work with ardl""" + # ANTA CLI Execution @@ -64,13 +66,9 @@ def cli() -> None: get.add_command(get_commands.cvp) info.add_command(info_commands.eos_versions) debug.add_command(debug_commands.xml) - ardl.add_command(version) # Load CLI - ardl( - obj={}, - auto_envvar_prefix='arista' - ) + ardl(obj={}, auto_envvar_prefix="arista") -if __name__ == '__main__': +if __name__ == "__main__": cli() diff --git a/eos_downloader/cli/debug/commands.py b/eos_downloader/cli/debug/commands.py index 107b8a0..5a0d7f8 100644 --- a/eos_downloader/cli/debug/commands.py +++ b/eos_downloader/cli/debug/commands.py @@ -22,32 +22,51 @@ import eos_downloader.eos @click.command() @click.pass_context -@click.option('--output', default=str('arista.xml'), help='Path to save XML file', type=click.Path(), show_default=True) -@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False)) +@click.option( + "--output", + default=str("arista.xml"), + help="Path to save XML file", + type=click.Path(), + show_default=True, +) +@click.option( + "--log-level", + "--log", + help="Logging level of the command", + default=None, + type=click.Choice( + ["debug", "info", "warning", "error", "critical"], case_sensitive=False + ), +) def xml(ctx: click.Context, output: str, log_level: str) -> None: # sourcery skip: remove-unnecessary-cast """Extract XML directory structure""" console = Console() # Get from Context - token = ctx.obj['token'] + token = ctx.obj["token"] logger.remove() if log_level is not None: logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper()) my_download = eos_downloader.eos.EOSDownloader( - image='unset', - software='EOS', - version='unset', + image="unset", + software="EOS", + version="unset", token=token, - hash_method='sha512sum') + hash_method="sha512sum", + ) my_download.authenticate() - xml_object: ET.ElementTree = my_download._get_folder_tree() # pylint: disable=protected-access + xml_object: ET.ElementTree = ( + my_download.get_folder_tree() + ) # pylint: disable=protected-access xml_content = xml_object.getroot() - xmlstr = minidom.parseString(ET.tostring(xml_content)).toprettyxml(indent=" ", newl='') - with open(output, "w", encoding='utf-8') as f: + xmlstr = minidom.parseString(ET.tostring(xml_content)).toprettyxml( + indent=" ", newl="" + ) + with open(output, "w", encoding="utf-8") as f: f.write(str(xmlstr)) - console.print(f'XML file saved in: { output }') + console.print(f"XML file saved in: { output }") diff --git a/eos_downloader/cli/get/commands.py b/eos_downloader/cli/get/commands.py index 13a8eec..b4525fe 100644 --- a/eos_downloader/cli/get/commands.py +++ b/eos_downloader/cli/get/commands.py @@ -21,68 +21,156 @@ from rich.console import Console import eos_downloader.eos from eos_downloader.models.version import BASE_VERSION_STR, RTYPE_FEATURE, RTYPES -EOS_IMAGE_TYPE = ['64', 'INT', '2GB-INT', 'cEOS', 'cEOS64', 'vEOS', 'vEOS-lab', 'EOS-2GB', 'default'] -CVP_IMAGE_TYPE = ['ova', 'rpm', 'kvm', 'upgrade'] +EOS_IMAGE_TYPE = [ + "64", + "INT", + "2GB-INT", + "cEOS", + "cEOS64", + "vEOS", + "vEOS-lab", + "EOS-2GB", + "default", +] +CVP_IMAGE_TYPE = ["ova", "rpm", "kvm", "upgrade"] + @click.command(no_args_is_help=True) @click.pass_context -@click.option('--image-type', default='default', help='EOS Image type', type=click.Choice(EOS_IMAGE_TYPE), required=True) -@click.option('--version', default=None, help='EOS version', type=str, required=False) -@click.option('--latest', '-l', is_flag=True, type=click.BOOL, default=False, help='Get latest version in given branch. If --branch is not use, get the latest branch with specific release type') -@click.option('--release-type', '-rtype', type=click.Choice(RTYPES, case_sensitive=False), default=RTYPE_FEATURE, help='EOS release type to search') -@click.option('--branch', '-b', type=click.STRING, default=None, help='EOS Branch to list releases') -@click.option('--docker-name', default='arista/ceos', help='Docker image name (default: arista/ceos)', type=str, show_default=True) -@click.option('--output', default=str(os.path.relpath(os.getcwd(), start=os.curdir)), help='Path to save image', type=click.Path(),show_default=True) +@click.option( + "--image-type", + default="default", + help="EOS Image type", + type=click.Choice(EOS_IMAGE_TYPE), + required=True, +) +@click.option("--version", default=None, help="EOS version", type=str, required=False) +@click.option( + "--latest", + "-l", + is_flag=True, + type=click.BOOL, + default=False, + help="Get latest version in given branch. If --branch is not use, get the latest branch with specific release type", +) +@click.option( + "--release-type", + "-rtype", + type=click.Choice(RTYPES, case_sensitive=False), + default=RTYPE_FEATURE, + help="EOS release type to search", +) +@click.option( + "--branch", + "-b", + type=click.STRING, + default=None, + help="EOS Branch to list releases", +) +@click.option( + "--docker-name", + default="arista/ceos", + help="Docker image name (default: arista/ceos)", + type=str, + show_default=True, +) +@click.option( + "--output", + default=str(os.path.relpath(os.getcwd(), start=os.curdir)), + help="Path to save image", + type=click.Path(), + show_default=True, +) # Debugging -@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False)) +@click.option( + "--log-level", + "--log", + help="Logging level of the command", + default=None, + type=click.Choice( + ["debug", "info", "warning", "error", "critical"], case_sensitive=False + ), +) # Boolean triggers -@click.option('--eve-ng', is_flag=True, help='Run EVE-NG vEOS provisioning (only if CLI runs on an EVE-NG server)', default=False) -@click.option('--disable-ztp', is_flag=True, help='Disable ZTP process in vEOS image (only available with --eve-ng)', default=False) -@click.option('--import-docker', is_flag=True, help='Import docker image (only available with --image_type cEOSlab)', default=False) +@click.option( + "--eve-ng", + is_flag=True, + help="Run EVE-NG vEOS provisioning (only if CLI runs on an EVE-NG server)", + default=False, +) +@click.option( + "--disable-ztp", + is_flag=True, + help="Disable ZTP process in vEOS image (only available with --eve-ng)", + default=False, +) +@click.option( + "--import-docker", + is_flag=True, + help="Import docker image (only available with --image_type cEOSlab)", + default=False, +) def eos( - ctx: click.Context, image_type: str, output: str, log_level: str, eve_ng: bool, disable_ztp: bool, - import_docker: bool, docker_name: str, version: Union[str, None] = None, release_type: str = RTYPE_FEATURE, - latest: bool = False, branch: Union[str,None] = None - ) -> int: + ctx: click.Context, + image_type: str, + output: str, + log_level: str, + eve_ng: bool, + disable_ztp: bool, + import_docker: bool, + docker_name: str, + version: Union[str, None] = None, + release_type: str = RTYPE_FEATURE, + latest: bool = False, + branch: Union[str, None] = None, +) -> int: """Download EOS image from Arista website""" console = Console() # Get from Context - token = ctx.obj['token'] - if token is None or token == '': - console.print('ā— Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red") + token = ctx.obj["token"] + if token is None or token == "": + console.print( + "ā— Token is unset ! Please configure ARISTA_TOKEN or use --token option", + style="bold red", + ) sys.exit(1) logger.remove() if log_level is not None: logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper()) - console.print("šŸŖ [bold blue]eos-downloader[/bold blue] is starting...", ) - console.print(f' - Image Type: {image_type}') - console.print(f' - Version: {version}') - + console.print( + "šŸŖ [bold blue]eos-downloader[/bold blue] is starting...", + ) + console.print(f" - Image Type: {image_type}") + console.print(f" - Version: {version}") if version is not None: my_download = eos_downloader.eos.EOSDownloader( image=image_type, - software='EOS', + software="EOS", version=version, token=token, - hash_method='sha512sum') + hash_method="sha512sum", + ) my_download.authenticate() elif latest: my_download = eos_downloader.eos.EOSDownloader( image=image_type, - software='EOS', - version='unset', + software="EOS", + version="unset", token=token, - hash_method='sha512sum') + hash_method="sha512sum", + ) my_download.authenticate() if branch is None: branch = str(my_download.latest_branch(rtype=release_type).branch) latest_version = my_download.latest_eos(branch, rtype=release_type) if str(latest_version) == BASE_VERSION_STR: - console.print(f'[red]Error[/red], cannot find any version in {branch} for {release_type} release type') + console.print( + f"[red]Error[/red], cannot find any version in {branch} for {release_type} release type" + ) sys.exit(1) my_download.version = str(latest_version) @@ -92,46 +180,71 @@ def eos( my_download.download_local(file_path=output, checksum=True) if import_docker: - my_download.docker_import( - image_name=docker_name - ) - console.print('āœ… processing done !') + my_download.docker_import(image_name=docker_name) + console.print("āœ… processing done !") sys.exit(0) - @click.command(no_args_is_help=True) @click.pass_context -@click.option('--format', default='upgrade', help='CVP Image type', type=click.Choice(CVP_IMAGE_TYPE), required=True) -@click.option('--version', default=None, help='CVP version', type=str, required=True) -@click.option('--output', default=str(os.path.relpath(os.getcwd(), start=os.curdir)), help='Path to save image', type=click.Path(),show_default=True) -@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False)) -def cvp(ctx: click.Context, version: str, format: str, output: str, log_level: str) -> int: +@click.option( + "--format", + default="upgrade", + help="CVP Image type", + type=click.Choice(CVP_IMAGE_TYPE), + required=True, +) +@click.option("--version", default=None, help="CVP version", type=str, required=True) +@click.option( + "--output", + default=str(os.path.relpath(os.getcwd(), start=os.curdir)), + help="Path to save image", + type=click.Path(), + show_default=True, +) +@click.option( + "--log-level", + "--log", + help="Logging level of the command", + default=None, + type=click.Choice( + ["debug", "info", "warning", "error", "critical"], case_sensitive=False + ), +) +def cvp( + ctx: click.Context, version: str, format: str, output: str, log_level: str +) -> int: """Download CVP image from Arista website""" console = Console() # Get from Context - token = ctx.obj['token'] - if token is None or token == '': - console.print('ā— Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red") + token = ctx.obj["token"] + if token is None or token == "": + console.print( + "ā— Token is unset ! Please configure ARISTA_TOKEN or use --token option", + style="bold red", + ) sys.exit(1) logger.remove() if log_level is not None: logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper()) - console.print("šŸŖ [bold blue]eos-downloader[/bold blue] is starting...", ) - console.print(f' - Image Type: {format}') - console.print(f' - Version: {version}') + console.print( + "šŸŖ [bold blue]eos-downloader[/bold blue] is starting...", + ) + console.print(f" - Image Type: {format}") + console.print(f" - Version: {version}") my_download = eos_downloader.eos.EOSDownloader( image=format, - software='CloudVision', + software="CloudVision", version=version, token=token, - hash_method='md5sum') + hash_method="md5sum", + ) my_download.authenticate() my_download.download_local(file_path=output, checksum=False) - console.print('āœ… processing done !') + console.print("āœ… processing done !") sys.exit(0) diff --git a/eos_downloader/cli/info/commands.py b/eos_downloader/cli/info/commands.py index b51003b..64097a1 100644 --- a/eos_downloader/cli/info/commands.py +++ b/eos_downloader/cli/info/commands.py @@ -24,12 +24,53 @@ from eos_downloader.models.version import BASE_VERSION_STR, RTYPE_FEATURE, RTYPE @click.command(no_args_is_help=True) @click.pass_context -@click.option('--latest', '-l', is_flag=True, type=click.BOOL, default=False, help='Get latest version in given branch. If --branch is not use, get the latest branch with specific release type') -@click.option('--release-type', '-rtype', type=click.Choice(RTYPES, case_sensitive=False), default=RTYPE_FEATURE, help='EOS release type to search') -@click.option('--branch', '-b', type=click.STRING, default=None, help='EOS Branch to list releases') -@click.option('--verbose', '-v', is_flag=True, type=click.BOOL, default=False, help='Human readable output. Default is none to use output in script)') -@click.option('--log-level', '--log', help='Logging level of the command', default='warning', type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False)) -def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = None, release_type: str = RTYPE_FEATURE, latest: bool = False, verbose: bool = False) -> None: +@click.option( + "--latest", + "-l", + is_flag=True, + type=click.BOOL, + default=False, + help="Get latest version in given branch. If --branch is not use, get the latest branch with specific release type", +) +@click.option( + "--release-type", + "-rtype", + type=click.Choice(RTYPES, case_sensitive=False), + default=RTYPE_FEATURE, + help="EOS release type to search", +) +@click.option( + "--branch", + "-b", + type=click.STRING, + default=None, + help="EOS Branch to list releases", +) +@click.option( + "--verbose", + "-v", + is_flag=True, + type=click.BOOL, + default=False, + help="Human readable output. Default is none to use output in script)", +) +@click.option( + "--log-level", + "--log", + help="Logging level of the command", + default="warning", + type=click.Choice( + ["debug", "info", "warning", "error", "critical"], case_sensitive=False + ), +) +def eos_versions( + ctx: click.Context, + log_level: str, + branch: Union[str, None] = None, + release_type: str = RTYPE_FEATURE, + latest: bool = False, + verbose: bool = False, +) -> None: # pylint: disable = too-many-branches """ List Available EOS version on Arista.com website. @@ -42,22 +83,23 @@ def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = N """ console = Console() # Get from Context - token = ctx.obj['token'] + token = ctx.obj["token"] logger.remove() if log_level is not None: logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper()) my_download = eos_downloader.eos.EOSDownloader( - image='unset', - software='EOS', - version='unset', + image="unset", + software="EOS", + version="unset", token=token, - hash_method='sha512sum') + hash_method="sha512sum", + ) auth = my_download.authenticate() if verbose and auth: - console.print('āœ… Authenticated on arista.com') + console.print("āœ… Authenticated on arista.com") if release_type is not None: release_type = release_type.upper() @@ -67,21 +109,27 @@ def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = N branch = str(my_download.latest_branch(rtype=release_type).branch) latest_version = my_download.latest_eos(branch, rtype=release_type) if str(latest_version) == BASE_VERSION_STR: - console.print(f'[red]Error[/red], cannot find any version in {branch} for {release_type} release type') + console.print( + f"[red]Error[/red], cannot find any version in {branch} for {release_type} release type" + ) sys.exit(1) if verbose: - console.print(f'Branch {branch} has been selected with release type {release_type}') + console.print( + f"Branch {branch} has been selected with release type {release_type}" + ) if branch is not None: - console.print(f'Latest release for {branch}: {latest_version}') + console.print(f"Latest release for {branch}: {latest_version}") else: - console.print(f'Latest EOS release: {latest_version}') + console.print(f"Latest EOS release: {latest_version}") else: - console.print(f'{ latest_version }') + console.print(f"{ latest_version }") else: versions = my_download.get_eos_versions(branch=branch, rtype=release_type) if verbose: - console.print(f'List of available versions for {branch if branch is not None else "all branches"}') + console.print( + f'List of available versions for {branch if branch is not None else "all branches"}' + ) for version in versions: - console.print(f' ā†’ {str(version)}') + console.print(f" ā†’ {str(version)}") else: pprint([str(version) for version in versions]) diff --git a/eos_downloader/cli/utils.py b/eos_downloader/cli/utils.py new file mode 100644 index 0000000..4a14f53 --- /dev/null +++ b/eos_downloader/cli/utils.py @@ -0,0 +1,38 @@ +#!/usr/bin/python +# coding: utf-8 -*- +# pylint: disable=inconsistent-return-statements + + +""" +Extension for the python ``click`` module +to provide a group or command with aliases. +""" + + +from typing import Any +import click + + +class AliasedGroup(click.Group): + """ + Implements a subclass of Group that accepts a prefix for a command. + If there were a command called push, it would accept pus as an alias (so long as it was unique) + """ + def get_command(self, ctx: click.Context, cmd_name: str) -> Any: + """Documentation to build""" + rv = click.Group.get_command(self, ctx, cmd_name) + if rv is not None: + return rv + matches = [x for x in self.list_commands(ctx) + if x.startswith(cmd_name)] + if not matches: + return None + if len(matches) == 1: + return click.Group.get_command(self, ctx, matches[0]) + ctx.fail(f"Too many matches: {', '.join(sorted(matches))}") + + def resolve_command(self, ctx: click.Context, args: Any) -> Any: + """Documentation to build""" + # always return the full command name + _, cmd, args = super().resolve_command(ctx, args) + return cmd.name, cmd, args diff --git a/eos_downloader/cvp.py b/eos_downloader/cvp.py index 6f14eb0..678daa8 100644 --- a/eos_downloader/cvp.py +++ b/eos_downloader/cvp.py @@ -6,11 +6,12 @@ CVP Uploader content """ import os -from typing import List, Optional, Any from dataclasses import dataclass -from loguru import logger +from typing import Any, List, Optional + from cvprac.cvp_client import CvpClient from cvprac.cvp_client_errors import CvpLoginError +from loguru import logger # from eos_downloader.tools import exc_to_str @@ -20,8 +21,9 @@ from cvprac.cvp_client_errors import CvpLoginError @dataclass class CvpAuthenticationItem: """ - Data structure to represent Cloudvision Authentication + Data structure to represent Cloudvision Authentication """ + server: str port: int = 443 token: Optional[str] = None @@ -29,15 +31,16 @@ class CvpAuthenticationItem: validate_cert: bool = False -class Filer(): +class Filer: # pylint: disable=too-few-public-methods """ Filer Helper for file management """ + def __init__(self, path: str) -> None: self.file_exist = False - self.filename = '' - self.absolute_path = '' + self.filename = "" + self.absolute_path = "" self.relative_path = path if os.path.exists(path): self.file_exist = True @@ -45,13 +48,14 @@ class Filer(): self.absolute_path = os.path.realpath(path) def __repr__(self) -> str: - return self.absolute_path if self.file_exist else '' + return self.absolute_path if self.file_exist else "" -class CvFeatureManager(): +class CvFeatureManager: """ CvFeatureManager Object to interect with Cloudvision """ + def __init__(self, authentication: CvpAuthenticationItem) -> None: """ __init__ Class Creator @@ -86,19 +90,21 @@ class CvFeatureManager(): try: client.connect( nodes=[authentication.server], - username='', - password='', + username="", + password="", api_token=authentication.token, is_cvaas=True, port=authentication.port, cert=authentication.validate_cert, - request_timeout=authentication.timeout + request_timeout=authentication.timeout, ) except CvpLoginError as error_data: - logger.error(f'Cannot connect to Cloudvision server {authentication.server}') - logger.debug(f'Error message: {error_data}') - logger.info('connected to Cloudvision server') - logger.debug(f'Connection info: {authentication}') + logger.error( + f"Cannot connect to Cloudvision server {authentication.server}" + ) + logger.debug(f"Error message: {error_data}") + logger.info("connected to Cloudvision server") + logger.debug(f"Connection info: {authentication}") return client def __get_images(self) -> List[Any]: @@ -111,8 +117,8 @@ class CvFeatureManager(): Fact returned by Cloudvision """ images = [] - logger.debug(' -> Collecting images') - images = self._cv_instance.api.get_images()['data'] + logger.debug(" -> Collecting images") + images = self._cv_instance.api.get_images()["data"] return images if self.__check_api_result(images) else [] # def __get_bundles(self): @@ -161,7 +167,11 @@ class CvFeatureManager(): bool True if present """ - return any(image_name == image['name'] for image in self._cv_images) if isinstance(self._cv_images, list) else False + return ( + any(image_name == image["name"] for image in self._cv_images) + if isinstance(self._cv_images, list) + else False + ) def _does_bundle_exist(self, bundle_name: str) -> bool: # pylint: disable=unused-argument @@ -192,19 +202,23 @@ class CvFeatureManager(): """ image_item = Filer(path=image_path) if image_item.file_exist is False: - logger.error(f'File not found: {image_item.relative_path}') + logger.error(f"File not found: {image_item.relative_path}") return False - logger.info(f'File path for image: {image_item}') + logger.info(f"File path for image: {image_item}") if self._does_image_exist(image_name=image_item.filename): - logger.error("Image found in Cloudvision , Please delete it before running this script") + logger.error( + "Image found in Cloudvision , Please delete it before running this script" + ) return False try: - upload_result = self._cv_instance.api.add_image(filepath=image_item.absolute_path) + upload_result = self._cv_instance.api.add_image( + filepath=image_item.absolute_path + ) except Exception as e: # pylint: disable=broad-exception-caught - logger.error('An error occurred during upload, check CV connection') - logger.error(f'Exception message is: {e}') + logger.error("An error occurred during upload, check CV connection") + logger.error(f"Exception message is: {e}") return False - logger.debug(f'Upload Result is : {upload_result}') + logger.debug(f"Upload Result is : {upload_result}") return True def build_image_list(self, image_list: List[str]) -> List[Any]: @@ -252,25 +266,30 @@ class CvFeatureManager(): bool True if succeeds """ - logger.debug(f'Init creation of an image bundle {name} with following images {images_name}') + logger.debug( + f"Init creation of an image bundle {name} with following images {images_name}" + ) all_images_present: List[bool] = [] self._cv_images = self.__get_images() all_images_present.extend( - self._does_image_exist(image_name=image_name) - for image_name in images_name + self._does_image_exist(image_name=image_name) for image_name in images_name ) # Bundle Create if self._does_bundle_exist(bundle_name=name) is False: - logger.debug(f'Creating image bundle {name} with following images {images_name}') + logger.debug( + f"Creating image bundle {name} with following images {images_name}" + ) images_data = self.build_image_list(image_list=images_name) if images_data is not None: - logger.debug('Images information: {images_data}') + logger.debug("Images information: {images_data}") try: - data = self._cv_instance.api.save_image_bundle(name=name, images=images_data) + data = self._cv_instance.api.save_image_bundle( + name=name, images=images_data + ) except Exception as e: # pylint: disable=broad-exception-caught - logger.critical(f'{e}') + logger.critical(f"{e}") else: logger.debug(data) return True - logger.critical('No data found for images') + logger.critical("No data found for images") return False diff --git a/eos_downloader/data.py b/eos_downloader/data.py index 74f2f8e..ba54b3b 100644 --- a/eos_downloader/data.py +++ b/eos_downloader/data.py @@ -12,82 +12,22 @@ Data are built from content of Arista XML file # [platform][image][version] DATA_MAPPING = { "CloudVision": { - "ova": { - "extension": ".ova", - "prepend": "cvp", - "folder_level": 0 - }, - "rpm": { - "extension": "", - "prepend": "cvp-rpm-installer", - "folder_level": 0 - }, - "kvm": { - "extension": "-kvm.tgz", - "prepend": "cvp", - "folder_level": 0 - }, - "upgrade": { - "extension": ".tgz", - "prepend": "cvp-upgrade", - "folder_level": 0 - }, + "ova": {"extension": ".ova", "prepend": "cvp", "folder_level": 0}, + "rpm": {"extension": "", "prepend": "cvp-rpm-installer", "folder_level": 0}, + "kvm": {"extension": "-kvm.tgz", "prepend": "cvp", "folder_level": 0}, + "upgrade": {"extension": ".tgz", "prepend": "cvp-upgrade", "folder_level": 0}, }, "EOS": { - "64": { - "extension": ".swi", - "prepend": "EOS64", - "folder_level": 0 - }, - "INT": { - "extension": "-INT.swi", - "prepend": "EOS", - "folder_level": 1 - }, - "2GB-INT": { - "extension": "-INT.swi", - "prepend": "EOS-2GB", - "folder_level": 1 - }, - "cEOS": { - "extension": ".tar.xz", - "prepend": "cEOS-lab", - "folder_level": 0 - }, - "cEOS64": { - "extension": ".tar.xz", - "prepend": "cEOS64-lab", - "folder_level": 0 - }, - "vEOS": { - "extension": ".vmdk", - "prepend": "vEOS", - "folder_level": 0 - }, - "vEOS-lab": { - "extension": ".vmdk", - "prepend": "vEOS-lab", - "folder_level": 0 - }, - "EOS-2GB": { - "extension": ".swi", - "prepend": "EOS-2GB", - "folder_level": 0 - }, - "RN": { - "extension": "-", - "prepend": "RN", - "folder_level": 0 - }, - "SOURCE": { - "extension": "-source.tar", - "prepend": "EOS", - "folder_level": 0 - }, - "default": { - "extension": ".swi", - "prepend": "EOS", - "folder_level": 0 - } - } + "64": {"extension": ".swi", "prepend": "EOS64", "folder_level": 0}, + "INT": {"extension": "-INT.swi", "prepend": "EOS", "folder_level": 1}, + "2GB-INT": {"extension": "-INT.swi", "prepend": "EOS-2GB", "folder_level": 1}, + "cEOS": {"extension": ".tar.xz", "prepend": "cEOS-lab", "folder_level": 0}, + "cEOS64": {"extension": ".tar.xz", "prepend": "cEOS64-lab", "folder_level": 0}, + "vEOS": {"extension": ".vmdk", "prepend": "vEOS", "folder_level": 0}, + "vEOS-lab": {"extension": ".vmdk", "prepend": "vEOS-lab", "folder_level": 0}, + "EOS-2GB": {"extension": ".swi", "prepend": "EOS-2GB", "folder_level": 0}, + "RN": {"extension": "-", "prepend": "RN", "folder_level": 0}, + "SOURCE": {"extension": "-source.tar", "prepend": "EOS", "folder_level": 0}, + "default": {"extension": ".swi", "prepend": "EOS", "folder_level": 0}, + }, } diff --git a/eos_downloader/download.py b/eos_downloader/download.py index 2297b04..2c9576e 100644 --- a/eos_downloader/download.py +++ b/eos_downloader/download.py @@ -8,13 +8,20 @@ import os.path import signal from concurrent.futures import ThreadPoolExecutor from threading import Event -from typing import Iterable, Any +from typing import Any, Iterable import requests import rich from rich import console -from rich.progress import (BarColumn, DownloadColumn, Progress, TaskID, - TextColumn, TimeElapsedColumn, TransferSpeedColumn) +from rich.progress import ( + BarColumn, + DownloadColumn, + Progress, + TaskID, + TextColumn, + TimeElapsedColumn, + TransferSpeedColumn, +) console = rich.get_console() done_event = Event() @@ -28,7 +35,7 @@ def handle_sigint(signum: Any, frame: Any) -> None: signal.signal(signal.SIGINT, handle_sigint) -class DownloadProgressBar(): +class DownloadProgressBar: """ Object to manage Download process with Progress Bar from Rich """ @@ -38,7 +45,9 @@ class DownloadProgressBar(): Class Constructor """ self.progress = Progress( - TextColumn("šŸ’¾ Downloading [bold blue]{task.fields[filename]}", justify="right"), + TextColumn( + "šŸ’¾ Downloading [bold blue]{task.fields[filename]}", justify="right" + ), BarColumn(bar_width=None), "[progress.percentage]{task.percentage:>3.1f}%", "ā€¢", @@ -48,14 +57,16 @@ class DownloadProgressBar(): "ā€¢", TimeElapsedColumn(), "ā€¢", - console=console + console=console, ) - def _copy_url(self, task_id: TaskID, url: str, path: str, block_size: int = 1024) -> bool: + def _copy_url( + self, task_id: TaskID, url: str, path: str, block_size: int = 1024 + ) -> bool: """Copy data from a url to a local file.""" response = requests.get(url, stream=True, timeout=5) # This will break if the response doesn't contain content length - self.progress.update(task_id, total=int(response.headers['Content-Length'])) + self.progress.update(task_id, total=int(response.headers["Content-Length"])) with open(path, "wb") as dest_file: self.progress.start_task(task_id) for data in response.iter_content(chunk_size=block_size): @@ -71,7 +82,9 @@ class DownloadProgressBar(): with self.progress: with ThreadPoolExecutor(max_workers=4) as pool: for url in urls: - filename = url.split("/")[-1].split('?')[0] + filename = url.split("/")[-1].split("?")[0] dest_path = os.path.join(dest_dir, filename) - task_id = self.progress.add_task("download", filename=filename, start=False) + task_id = self.progress.add_task( + "download", filename=filename, start=False + ) pool.submit(self._copy_url, task_id, url, dest_path) diff --git a/eos_downloader/eos.py b/eos_downloader/eos.py index e5f3670..716992f 100644 --- a/eos_downloader/eos.py +++ b/eos_downloader/eos.py @@ -14,13 +14,20 @@ import rich from loguru import logger from rich import console -from eos_downloader.models.version import BASE_BRANCH_STR, BASE_VERSION_STR, REGEX_EOS_VERSION, RTYPE_FEATURE, EosVersion +from eos_downloader.models.version import ( + BASE_BRANCH_STR, + BASE_VERSION_STR, + REGEX_EOS_VERSION, + RTYPE_FEATURE, + EosVersion, +) from eos_downloader.object_downloader import ObjectDownloader # logger = logging.getLogger(__name__) console = rich.get_console() + class EOSDownloader(ObjectDownloader): """ EOSDownloader Object to download EOS images from Arista.com website @@ -47,22 +54,27 @@ class EOSDownloader(ObjectDownloader): file_path : str Path where EOS image is located """ - logger.info('Mounting volume to disable ZTP') - console.print('šŸš€ Mounting volume to disable ZTP') + logger.info("Mounting volume to disable ZTP") + console.print("šŸš€ Mounting volume to disable ZTP") raw_folder = os.path.join(file_path, "raw") os.system(f"rm -rf {raw_folder}") os.system(f"mkdir -p {raw_folder}") os.system( - f'guestmount -a {os.path.join(file_path, "hda.qcow2")} -m /dev/sda2 {os.path.join(file_path, "raw")}') - ztp_file = os.path.join(file_path, 'raw/zerotouch-config') - with open(ztp_file, 'w', encoding='ascii') as zfile: - zfile.write('DISABLE=True') - logger.info(f'Unmounting volume in {file_path}') + f'guestmount -a {os.path.join(file_path, "hda.qcow2")} -m /dev/sda2 {os.path.join(file_path, "raw")}' + ) + ztp_file = os.path.join(file_path, "raw/zerotouch-config") + with open(ztp_file, "w", encoding="ascii") as zfile: + zfile.write("DISABLE=True") + logger.info(f"Unmounting volume in {file_path}") os.system(f"guestunmount {os.path.join(file_path, 'raw')}") os.system(f"rm -rf {os.path.join(file_path, 'raw')}") logger.info(f"Volume has been successfully unmounted at {file_path}") - def _parse_xml_for_version(self,root_xml: ET.ElementTree, xpath: str = './/dir[@label="Active Releases"]/dir/dir/[@label]') -> List[EosVersion]: + def _parse_xml_for_version( + self, + root_xml: ET.ElementTree, + xpath: str = './/dir[@label="Active Releases"]/dir/dir/[@label]', + ) -> List[EosVersion]: """ Extract list of available EOS versions from Arista.com website @@ -77,19 +89,21 @@ class EOSDownloader(ObjectDownloader): """ # XPATH: .//dir[@label="Active Releases"]/dir/dir/[@label] if self.eos_versions is None: - logger.debug(f'Using xpath {xpath}') + logger.debug(f"Using xpath {xpath}") eos_versions = [] for node in root_xml.findall(xpath): - if 'label' in node.attrib and node.get('label') is not None: - label = node.get('label') + if "label" in node.attrib and node.get("label") is not None: + label = node.get("label") if label is not None and REGEX_EOS_VERSION.match(label): eos_version = EosVersion.from_str(label) eos_versions.append(eos_version) logger.debug(f"Found {label} - {eos_version}") - logger.debug(f'List of versions found on arista.com is: {eos_versions}') + logger.debug(f"List of versions found on arista.com is: {eos_versions}") self.eos_versions = eos_versions else: - logger.debug('receiving instruction to download versions, but already available') + logger.debug( + "receiving instruction to download versions, but already available" + ) return self.eos_versions def _get_branches(self, with_rtype: str = RTYPE_FEATURE) -> List[str]: @@ -104,9 +118,11 @@ class EOSDownloader(ObjectDownloader): Returns: List[str]: A lsit of string that represent all availables EOS branches """ - root = self._get_folder_tree() + root = self.get_folder_tree() versions = self._parse_xml_for_version(root_xml=root) - return list({version.branch for version in versions if version.rtype == with_rtype}) + return list( + {version.branch for version in versions if version.rtype == with_rtype} + ) def latest_branch(self, rtype: str = RTYPE_FEATURE) -> EosVersion: """ @@ -125,7 +141,9 @@ class EOSDownloader(ObjectDownloader): selected_branch = branch return selected_branch - def get_eos_versions(self, branch: Union[str,None] = None, rtype: Union[str,None] = None) -> List[EosVersion]: + def get_eos_versions( + self, branch: Union[str, None] = None, rtype: Union[str, None] = None + ) -> List[EosVersion]: """ Get a list of available EOS version available on arista.com @@ -139,16 +157,22 @@ class EOSDownloader(ObjectDownloader): Returns: List[EosVersion]: A list of versions available """ - root = self._get_folder_tree() + root = self.get_folder_tree() result = [] for version in self._parse_xml_for_version(root_xml=root): if branch is None and (version.rtype == rtype or rtype is None): result.append(version) - elif branch is not None and version.is_in_branch(branch) and version.rtype == rtype: + elif ( + branch is not None + and version.is_in_branch(branch) + and version.rtype == rtype + ): result.append(version) return result - def latest_eos(self, branch: Union[str,None] = None, rtype: str = RTYPE_FEATURE) -> EosVersion: + def latest_eos( + self, branch: Union[str, None] = None, rtype: str = RTYPE_FEATURE + ) -> EosVersion: """ Get latest version of EOS @@ -168,7 +192,9 @@ class EOSDownloader(ObjectDownloader): latest_branch = self.latest_branch(rtype=rtype) else: latest_branch = EosVersion.from_str(branch) - for version in self.get_eos_versions(branch=str(latest_branch.branch), rtype=rtype): + for version in self.get_eos_versions( + branch=str(latest_branch.branch), rtype=rtype + ): if version > selected_version: if rtype is not None and version.rtype == rtype: selected_version = version diff --git a/eos_downloader/models/version.py b/eos_downloader/models/version.py index 4b051a5..22de100 100644 --- a/eos_downloader/models/version.py +++ b/eos_downloader/models/version.py @@ -16,11 +16,11 @@ from eos_downloader.tools import exc_to_str # logger = logging.getLogger(__name__) -BASE_VERSION_STR = '4.0.0F' -BASE_BRANCH_STR = '4.0' +BASE_VERSION_STR = "4.0.0F" +BASE_BRANCH_STR = "4.0" -RTYPE_FEATURE = 'F' -RTYPE_MAINTENANCE = 'M' +RTYPE_FEATURE = "F" +RTYPE_MAINTENANCE = "M" RTYPES = [RTYPE_FEATURE, RTYPE_MAINTENANCE] # Regular Expression to capture multiple EOS version format @@ -29,8 +29,12 @@ RTYPES = [RTYPE_FEATURE, RTYPE_MAINTENANCE] # 4.21.1M # 4.28.10.F # 4.28.6.1M -REGEX_EOS_VERSION = re.compile(r"^.*(?P4)\.(?P\d{1,2})\.(?P\d{1,2})(?P\.\d*)*(?P[M,F])*$") -REGEX_EOS_BRANCH = re.compile(r"^.*(?P4)\.(?P\d{1,2})(\.?P\d)*(\.\d)*(?P[M,F])*$") +REGEX_EOS_VERSION = re.compile( + r"^.*(?P4)\.(?P\d{1,2})\.(?P\d{1,2})(?P\.\d*)*(?P[M,F])*$" +) +REGEX_EOS_BRANCH = re.compile( + r"^.*(?P4)\.(?P\d{1,2})(\.?P\d)*(\.\d)*(?P[M,F])*$" +) class EosVersion(BaseModel): @@ -59,10 +63,11 @@ class EosVersion(BaseModel): Args: BaseModel (Pydantic): Pydantic Base Model """ + major: int = 4 minor: int = 0 patch: int = 0 - rtype: Optional[str] = 'F' + rtype: Optional[str] = "F" other: Any = None @classmethod @@ -84,7 +89,7 @@ class EosVersion(BaseModel): Returns: EosVersion object """ - logger.debug(f'receiving version: {eos_version}') + logger.debug(f"receiving version: {eos_version}") if REGEX_EOS_VERSION.match(eos_version): matches = REGEX_EOS_VERSION.match(eos_version) # assert matches is not None @@ -95,7 +100,7 @@ class EosVersion(BaseModel): # assert matches is not None assert matches is not None return cls(**matches.groupdict()) - logger.error(f'Error occured with {eos_version}') + logger.error(f"Error occured with {eos_version}") return EosVersion() @property @@ -106,7 +111,7 @@ class EosVersion(BaseModel): Returns: str: branch from version """ - return f'{self.major}.{self.minor}' + return f"{self.major}.{self.minor}" def __str__(self) -> str: """ @@ -118,8 +123,8 @@ class EosVersion(BaseModel): str: A standard EOS version string representing .. """ if self.other is None: - return f'{self.major}.{self.minor}.{self.patch}{self.rtype}' - return f'{self.major}.{self.minor}.{self.patch}{self.other}{self.rtype}' + return f"{self.major}.{self.minor}.{self.patch}{self.rtype}" + return f"{self.major}.{self.minor}.{self.patch}{self.other}{self.rtype}" def _compare(self, other: EosVersion) -> float: """ @@ -141,58 +146,68 @@ class EosVersion(BaseModel): float: -1 if ver1 < ver2, 0 if ver1 == ver2, 1 if ver1 > ver2 """ if not isinstance(other, EosVersion): - raise ValueError(f'could not compare {other} as it is not an EosVersion object') + raise ValueError( + f"could not compare {other} as it is not an EosVersion object" + ) comparison_flag: float = 0 - logger.warning(f'current version {self.__str__()} - other {str(other)}') # pylint: disable = unnecessary-dunder-call + logger.warning( + f"current version {self.__str__()} - other {str(other)}" # pylint: disable = unnecessary-dunder-call + ) for key, _ in self.dict().items(): - if comparison_flag == 0 and self.dict()[key] is None or other.dict()[key] is None: - logger.debug(f'{key}: local None - remote None') - logger.debug(f'{key}: local {self.dict()} - remote {other.dict()}') + if ( + comparison_flag == 0 + and self.dict()[key] is None + or other.dict()[key] is None + ): + logger.debug(f"{key}: local None - remote None") + logger.debug(f"{key}: local {self.dict()} - remote {other.dict()}") return comparison_flag - logger.debug(f'{key}: local {self.dict()[key]} - remote {other.dict()[key]}') + logger.debug( + f"{key}: local {self.dict()[key]} - remote {other.dict()[key]}" + ) if comparison_flag == 0 and self.dict()[key] < other.dict()[key]: comparison_flag = -1 if comparison_flag == 0 and self.dict()[key] > other.dict()[key]: comparison_flag = 1 if comparison_flag != 0: - logger.info(f'comparison result is {comparison_flag}') + logger.info(f"comparison result is {comparison_flag}") return comparison_flag - logger.info(f'comparison result is {comparison_flag}') + logger.info(f"comparison result is {comparison_flag}") return comparison_flag @typing.no_type_check def __eq__(self, other): - """ Implement __eq__ function (==) """ + """Implement __eq__ function (==)""" return self._compare(other) == 0 @typing.no_type_check def __ne__(self, other): # type: ignore - """ Implement __nw__ function (!=) """ + """Implement __nw__ function (!=)""" return self._compare(other) != 0 @typing.no_type_check def __lt__(self, other): # type: ignore - """ Implement __lt__ function (<) """ + """Implement __lt__ function (<)""" return self._compare(other) < 0 @typing.no_type_check def __le__(self, other): # type: ignore - """ Implement __le__ function (<=) """ + """Implement __le__ function (<=)""" return self._compare(other) <= 0 @typing.no_type_check def __gt__(self, other): # type: ignore - """ Implement __gt__ function (>) """ + """Implement __gt__ function (>)""" return self._compare(other) > 0 @typing.no_type_check def __ge__(self, other): # type: ignore - """ Implement __ge__ function (>=) """ + """Implement __ge__ function (>=)""" return self._compare(other) >= 0 def match(self, match_expr: str) -> bool: @@ -236,7 +251,7 @@ class EosVersion(BaseModel): "['<', '>', '==', '<=', '>=', '!=']. " f"You provided: {match_expr}" ) - logger.debug(f'work on comparison {prefix} with base release {match_version}') + logger.debug(f"work on comparison {prefix} with base release {match_version}") possibilities_dict = { ">": (1,), "<": (-1,), @@ -263,7 +278,7 @@ class EosVersion(BaseModel): bool: True if current version is in provided branch, otherwise False """ try: - logger.debug(f'reading branch str:{branch_str}') + logger.debug(f"reading branch str:{branch_str}") branch = EosVersion.from_str(branch_str) except Exception as error: # pylint: disable = broad-exception-caught logger.error(exc_to_str(error)) diff --git a/eos_downloader/object_downloader.py b/eos_downloader/object_downloader.py index 0420acb..d7b1418 100644 --- a/eos_downloader/object_downloader.py +++ b/eos_downloader/object_downloader.py @@ -8,8 +8,13 @@ eos_downloader class definition """ -from __future__ import (absolute_import, division, print_function, - unicode_literals, annotations) +from __future__ import ( + absolute_import, + annotations, + division, + print_function, + unicode_literals, +) import base64 import glob @@ -26,9 +31,14 @@ from loguru import logger from rich import console from tqdm import tqdm -from eos_downloader import (ARISTA_DOWNLOAD_URL, ARISTA_GET_SESSION, - ARISTA_SOFTWARE_FOLDER_TREE, EVE_QEMU_FOLDER_PATH, - MSG_INVALID_DATA, MSG_TOKEN_EXPIRED) +from eos_downloader import ( + ARISTA_DOWNLOAD_URL, + ARISTA_GET_SESSION, + ARISTA_SOFTWARE_FOLDER_TREE, + EVE_QEMU_FOLDER_PATH, + MSG_INVALID_DATA, + MSG_TOKEN_EXPIRED, +) from eos_downloader.data import DATA_MAPPING from eos_downloader.download import DownloadProgressBar @@ -37,11 +47,19 @@ from eos_downloader.download import DownloadProgressBar console = rich.get_console() -class ObjectDownloader(): +class ObjectDownloader: """ ObjectDownloader Generic Object to download from Arista.com """ - def __init__(self, image: str, version: str, token: str, software: str = 'EOS', hash_method: str = 'md5sum'): + + def __init__( + self, + image: str, + version: str, + token: str, + software: str = "EOS", + hash_method: str = "md5sum", + ): """ __init__ Class constructor @@ -70,10 +88,10 @@ class ObjectDownloader(): self.hash_method = hash_method self.timeout = 5 # Logging - logger.debug(f'Filename built by _build_filename is {self.filename}') + logger.debug(f"Filename built by _build_filename is {self.filename}") def __str__(self) -> str: - return f'{self.software} - {self.image} - {self.version}' + return f"{self.software} - {self.image} - {self.version}" # def __repr__(self): # return str(self.__dict__) @@ -102,16 +120,18 @@ class ObjectDownloader(): str: Filename to search for on Arista.com """ - logger.info('start build') + logger.info("start build") if self.software in DATA_MAPPING: - logger.info(f'software in data mapping: {self.software}') + logger.info(f"software in data mapping: {self.software}") if self.image in DATA_MAPPING[self.software]: - logger.info(f'image in data mapping: {self.image}') + logger.info(f"image in data mapping: {self.image}") return f"{DATA_MAPPING[self.software][self.image]['prepend']}-{self.version}{DATA_MAPPING[self.software][self.image]['extension']}" return f"{DATA_MAPPING[self.software]['default']['prepend']}-{self.version}{DATA_MAPPING[self.software]['default']['extension']}" - raise ValueError(f'Incorrect value for software {self.software}') + raise ValueError(f"Incorrect value for software {self.software}") - def _parse_xml_for_path(self, root_xml: ET.ElementTree, xpath: str, search_file: str) -> str: + def _parse_xml_for_path( + self, root_xml: ET.ElementTree, xpath: str, search_file: str + ) -> str: # sourcery skip: remove-unnecessary-cast """ _parse_xml Read and extract data from XML using XPATH @@ -132,18 +152,18 @@ class ObjectDownloader(): str File Path on Arista server side """ - logger.debug(f'Using xpath {xpath}') - logger.debug(f'Search for file {search_file}') - console.print(f'šŸ”Ž Searching file {search_file}') + logger.debug(f"Using xpath {xpath}") + logger.debug(f"Search for file {search_file}") + console.print(f"šŸ”Ž Searching file {search_file}") for node in root_xml.findall(xpath): # logger.debug('Found {}', node.text) if str(node.text).lower() == search_file.lower(): - path = node.get('path') - console.print(f' -> Found file at {path}') + path = node.get("path") + console.print(f" -> Found file at {path}") logger.info(f'Found {node.text} at {node.get("path")}') - return str(node.get('path')) if node.get('path') is not None else '' - logger.error(f'Requested file ({self.filename}) not found !') - return '' + return str(node.get("path")) if node.get("path") is not None else "" + logger.error(f"Requested file ({self.filename}) not found !") + return "" def _get_hash(self, file_path: str) -> str: """ @@ -165,10 +185,10 @@ class ObjectDownloader(): dl_rich_progress_bar = DownloadProgressBar() dl_rich_progress_bar.download(urls=[hash_url], dest_dir=file_path) hash_downloaded = f"{file_path}/{os.path.basename(remote_hash_file)}" - hash_content = 'unset' - with open(hash_downloaded, 'r', encoding='utf-8') as f: + hash_content = "unset" + with open(hash_downloaded, "r", encoding="utf-8") as f: hash_content = f.read() - return hash_content.split(' ')[0] + return hash_content.split(" ")[0] @staticmethod def _compute_hash_md5sum(file: str, hash_expected: str) -> bool: @@ -195,7 +215,9 @@ class ObjectDownloader(): hash_md5.update(chunk) if hash_md5.hexdigest() == hash_expected: return True - logger.warning(f'Downloaded file is corrupt: local md5 ({hash_md5.hexdigest()}) is different to md5 from arista ({hash_expected})') + logger.warning( + f"Downloaded file is corrupt: local md5 ({hash_md5.hexdigest()}) is different to md5 from arista ({hash_expected})" + ) return False @staticmethod @@ -223,10 +245,12 @@ class ObjectDownloader(): hash_sha512.update(chunk) if hash_sha512.hexdigest() == hash_expected: return True - logger.warning(f'Downloaded file is corrupt: local sha512 ({hash_sha512.hexdigest()}) is different to sha512 from arista ({hash_expected})') + logger.warning( + f"Downloaded file is corrupt: local sha512 ({hash_sha512.hexdigest()}) is different to sha512 from arista ({hash_expected})" + ) return False - def _get_folder_tree(self) -> ET.ElementTree: + def get_folder_tree(self) -> ET.ElementTree: """ _get_folder_tree Download XML tree from Arista server @@ -237,15 +261,17 @@ class ObjectDownloader(): """ if self.session_id is None: self.authenticate() - jsonpost = {'sessionCode': self.session_id} - result = requests.post(ARISTA_SOFTWARE_FOLDER_TREE, data=json.dumps(jsonpost), timeout=self.timeout) + jsonpost = {"sessionCode": self.session_id} + result = requests.post( + ARISTA_SOFTWARE_FOLDER_TREE, data=json.dumps(jsonpost), timeout=self.timeout + ) try: folder_tree = result.json()["data"]["xml"] return ET.ElementTree(ET.fromstring(folder_tree)) except KeyError as error: logger.error(MSG_INVALID_DATA) - logger.error(f'Server returned: {error}') - console.print(f'āŒ {MSG_INVALID_DATA}', style="bold red") + logger.error(f"Server returned: {error}") + console.print(f"āŒ {MSG_INVALID_DATA}", style="bold red") sys.exit(1) def _get_remote_filepath(self) -> str: @@ -259,12 +285,14 @@ class ObjectDownloader(): str Remote path of the file to download """ - root = self._get_folder_tree() + root = self.get_folder_tree() logger.debug("GET XML content from ARISTA.com") xpath = f'.//dir[@label="{self.software}"]//file' - return self._parse_xml_for_path(root_xml=root, xpath=xpath, search_file=self.filename) + return self._parse_xml_for_path( + root_xml=root, xpath=xpath, search_file=self.filename + ) - def _get_remote_hashpath(self, hash_method: str = 'md5sum') -> str: + def _get_remote_hashpath(self, hash_method: str = "md5sum") -> str: """ _get_remote_hashpath Helper to get path of the hash's file to download @@ -275,16 +303,16 @@ class ObjectDownloader(): str Remote path of the hash's file to download """ - root = self._get_folder_tree() + root = self.get_folder_tree() logger.debug("GET XML content from ARISTA.com") xpath = f'.//dir[@label="{self.software}"]//file' return self._parse_xml_for_path( root_xml=root, xpath=xpath, - search_file=f'{self.filename}.{hash_method}', + search_file=f"{self.filename}.{hash_method}", ) - def _get_url(self, remote_file_path: str) -> str: + def _get_url(self, remote_file_path: str) -> str: """ _get_url Get URL to use for downloading file from Arista server @@ -302,13 +330,15 @@ class ObjectDownloader(): """ if self.session_id is None: self.authenticate() - jsonpost = {'sessionCode': self.session_id, 'filePath': remote_file_path} - result = requests.post(ARISTA_DOWNLOAD_URL, data=json.dumps(jsonpost), timeout=self.timeout) - if 'data' in result.json() and 'url' in result.json()['data']: + jsonpost = {"sessionCode": self.session_id, "filePath": remote_file_path} + result = requests.post( + ARISTA_DOWNLOAD_URL, data=json.dumps(jsonpost), timeout=self.timeout + ) + if "data" in result.json() and "url" in result.json()["data"]: # logger.debug('URL to download file is: {}', result.json()) return result.json()["data"]["url"] - logger.critical(f'Server returns following message: {result.json()}') - return '' + logger.critical(f"Server returns following message: {result.json()}") + return "" @staticmethod def _download_file_raw(url: str, file_path: str) -> str: @@ -331,31 +361,40 @@ class ObjectDownloader(): """ chunkSize = 1024 r = requests.get(url, stream=True, timeout=5) - with open(file_path, 'wb') as f: - pbar = tqdm(unit="B", total=int(r.headers['Content-Length']), unit_scale=True, unit_divisor=1024) + with open(file_path, "wb") as f: + pbar = tqdm( + unit="B", + total=int(r.headers["Content-Length"]), + unit_scale=True, + unit_divisor=1024, + ) for chunk in r.iter_content(chunk_size=chunkSize): if chunk: pbar.update(len(chunk)) f.write(chunk) return file_path - def _download_file(self, file_path: str, filename: str, rich_interface: bool = True) -> Union[None, str]: + def _download_file( + self, file_path: str, filename: str, rich_interface: bool = True + ) -> Union[None, str]: remote_file_path = self._get_remote_filepath() - logger.info(f'File found on arista server: {remote_file_path}') + logger.info(f"File found on arista server: {remote_file_path}") file_url = self._get_url(remote_file_path=remote_file_path) if file_url is not False: if not rich_interface: - return self._download_file_raw(url=file_url, file_path=os.path.join(file_path, filename)) + return self._download_file_raw( + url=file_url, file_path=os.path.join(file_path, filename) + ) rich_downloader = DownloadProgressBar() rich_downloader.download(urls=[file_url], dest_dir=file_path) return os.path.join(file_path, filename) - logger.error(f'Cannot download file {file_path}') + logger.error(f"Cannot download file {file_path}") return None @staticmethod def _create_destination_folder(path: str) -> None: # os.makedirs(path, mode, exist_ok=True) - os.system(f'mkdir -p {path}') + os.system(f"mkdir -p {path}") @staticmethod def _disable_ztp(file_path: str) -> None: @@ -379,24 +418,29 @@ class ObjectDownloader(): """ credentials = (base64.b64encode(self.token.encode())).decode("utf-8") session_code_url = ARISTA_GET_SESSION - jsonpost = {'accessToken': credentials} + jsonpost = {"accessToken": credentials} - result = requests.post(session_code_url, data=json.dumps(jsonpost), timeout=self.timeout) + result = requests.post( + session_code_url, data=json.dumps(jsonpost), timeout=self.timeout + ) - if result.json()["status"]["message"] in[ 'Access token expired', 'Invalid access token']: - console.print(f'āŒ {MSG_TOKEN_EXPIRED}', style="bold red") + if result.json()["status"]["message"] in [ + "Access token expired", + "Invalid access token", + ]: + console.print(f"āŒ {MSG_TOKEN_EXPIRED}", style="bold red") logger.error(MSG_TOKEN_EXPIRED) return False try: - if 'data' in result.json(): + if "data" in result.json(): self.session_id = result.json()["data"]["session_code"] - logger.info('Authenticated on arista.com') + logger.info("Authenticated on arista.com") return True - logger.debug(f'{result.json()}') + logger.debug(f"{result.json()}") return False except KeyError as error_arista: - logger.error(f'Error: {error_arista}') + logger.error(f"Error: {error_arista}") sys.exit(1) def download_local(self, file_path: str, checksum: bool = False) -> bool: @@ -422,25 +466,33 @@ class ObjectDownloader(): bool True if everything went well, False if any problem appears """ - file_downloaded = str(self._download_file(file_path=file_path, filename=self.filename)) + file_downloaded = str( + self._download_file(file_path=file_path, filename=self.filename) + ) # Check file HASH hash_result = False if checksum: - logger.info('šŸš€ Running checksum validation') - console.print('šŸš€ Running checksum validation') - if self.hash_method == 'md5sum': + logger.info("šŸš€ Running checksum validation") + console.print("šŸš€ Running checksum validation") + if self.hash_method == "md5sum": hash_expected = self._get_hash(file_path=file_path) - hash_result = self._compute_hash_md5sum(file=file_downloaded, hash_expected=hash_expected) - elif self.hash_method == 'sha512sum': + hash_result = self._compute_hash_md5sum( + file=file_downloaded, hash_expected=hash_expected + ) + elif self.hash_method == "sha512sum": hash_expected = self._get_hash(file_path=file_path) - hash_result = self._compute_hash_sh512sum(file=file_downloaded, hash_expected=hash_expected) + hash_result = self._compute_hash_sh512sum( + file=file_downloaded, hash_expected=hash_expected + ) if not hash_result: - logger.error('Downloaded file is corrupted, please check your connection') - console.print('āŒ Downloaded file is corrupted, please check your connection') + logger.error("Downloaded file is corrupted, please check your connection") + console.print( + "āŒ Downloaded file is corrupted, please check your connection" + ) return False - logger.info('Downloaded file is correct.') - console.print('āœ… Downloaded file is correct.') + logger.info("Downloaded file is correct.") + console.print("āœ… Downloaded file is correct.") return True def provision_eve(self, noztp: bool = False, checksum: bool = True) -> None: @@ -466,7 +518,7 @@ class ObjectDownloader(): # Build image name to use in folder path eos_image_name = self.filename.rstrip(".vmdk").lower() if noztp: - eos_image_name = f'{eos_image_name}-noztp' + eos_image_name = f"{eos_image_name}-noztp" # Create full path for EVE-NG file_path = os.path.join(EVE_QEMU_FOLDER_PATH, eos_image_name.rstrip()) # Create folders in filesystem @@ -474,20 +526,23 @@ class ObjectDownloader(): # Download file to local destination file_downloaded = self._download_file( - file_path=file_path, filename=self.filename) + file_path=file_path, filename=self.filename + ) # Convert to QCOW2 format file_qcow2 = os.path.join(file_path, "hda.qcow2") - logger.info('Converting VMDK to QCOW2 format') - console.print('šŸš€ Converting VMDK to QCOW2 format...') + logger.info("Converting VMDK to QCOW2 format") + console.print("šŸš€ Converting VMDK to QCOW2 format...") - os.system(f'$(which qemu-img) convert -f vmdk -O qcow2 {file_downloaded} {file_qcow2}') + os.system( + f"$(which qemu-img) convert -f vmdk -O qcow2 {file_downloaded} {file_qcow2}" + ) - logger.info('Applying unl_wrapper to fix permissions') - console.print('Applying unl_wrapper to fix permissions') + logger.info("Applying unl_wrapper to fix permissions") + console.print("Applying unl_wrapper to fix permissions") - os.system('/opt/unetlab/wrappers/unl_wrapper -a fixpermissions') - os.system(f'rm -f {file_downloaded}') + os.system("/opt/unetlab/wrappers/unl_wrapper -a fixpermissions") + os.system(f"rm -f {file_downloaded}") if noztp: self._disable_ztp(file_path=file_path) @@ -502,12 +557,12 @@ class ObjectDownloader(): version (str): image_name (str, optional): Image name to use. Defaults to "arista/ceos". """ - docker_image = f'{image_name}:{self.version}' - logger.info(f'Importing image {self.filename} to {docker_image}') - console.print(f'šŸš€ Importing image {self.filename} to {docker_image}') - os.system(f'$(which docker) import {self.filename} {docker_image}') - for filename in glob.glob(f'{self.filename}*'): + docker_image = f"{image_name}:{self.version}" + logger.info(f"Importing image {self.filename} to {docker_image}") + console.print(f"šŸš€ Importing image {self.filename} to {docker_image}") + os.system(f"$(which docker) import {self.filename} {docker_image}") + for filename in glob.glob(f"{self.filename}*"): try: os.remove(filename) except FileNotFoundError: - console.print(f'File not found: {filename}') + console.print(f"File not found: {filename}") diff --git a/pylintrc b/pylintrc index a9fbc4a..dd7dfff 100644 --- a/pylintrc +++ b/pylintrc @@ -2,7 +2,8 @@ disable= invalid-name, logging-fstring-interpolation, - fixme + fixme, + line-too-long [BASIC] good-names=runCmds, i, y, t, c, x, e, fd, ip, v diff --git a/pyproject.toml b/pyproject.toml index 1fa01f9..f98c4ee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" [project] name = "eos_downloader" -version = "v0.8.2" +version = "v0.9.0" readme = "README.md" authors = [{ name = "Thomas Grimonet", email = "thomas.grimonet@gmail.com" }] maintainers = [ @@ -22,7 +22,7 @@ dependencies = [ "scp", "tqdm", "loguru", - "rich~=13.5.2", + "rich>=13.5.2,<13.7.0", "cvprac>=1.0.7", "click~=8.1.6", "click-help-colors~=0.9", @@ -62,7 +62,7 @@ dev = [ "pytest-html>=3.1.1", "pytest-metadata>=1.11.0", "pylint-pydantic>=0.2.4", - "tox==4.10.0", + "tox~=4.11", "types-PyYAML", "types-paramiko", "types-requests", @@ -94,7 +94,7 @@ namespaces = false # Version ################################ [tool.bumpver] -current_version = "0.8.2" +current_version = "0.9.0" version_pattern = "MAJOR.MINOR.PATCH" commit_message = "bump: Version {old_version} -> {new_version}" commit = true diff --git a/tests/lib/dataset.py b/tests/lib/dataset.py index 1286456..34175e9 100644 --- a/tests/lib/dataset.py +++ b/tests/lib/dataset.py @@ -5,12 +5,13 @@ # flake8: noqa: W503 # flake8: noqa: W1202 -from __future__ import (absolute_import, division, print_function) +from __future__ import absolute_import, division, print_function + import os + import eos_downloader -from eos_downloader.eos import EOSDownloader from eos_downloader.data import DATA_MAPPING - +from eos_downloader.eos import EOSDownloader # --------------------------------------------------------------- # # MOOCK data to use for testing @@ -18,99 +19,99 @@ from eos_downloader.data import DATA_MAPPING # Get Auth token # eos_token = os.getenv('ARISTA_TOKEN') -eos_token = os.getenv('ARISTA_TOKEN', 'invalid_token') -eos_token_invalid = 'invalid_token' +eos_token = os.getenv("ARISTA_TOKEN", "invalid_token") +eos_token_invalid = "invalid_token" eos_dataset_valid = [ { - 'image': 'EOS', - 'version': '4.26.3M', - 'software': 'EOS', - 'filename': 'EOS-4.26.3M.swi', - 'expected_hash': 'sha512sum', - 'remote_path': '/support/download/EOS-USA/Active Releases/4.26/EOS-4.26.3M/EOS-4.26.3M.swi', - 'compute_checksum': True + "image": "EOS", + "version": "4.26.3M", + "software": "EOS", + "filename": "EOS-4.26.3M.swi", + "expected_hash": "sha512sum", + "remote_path": "/support/download/EOS-USA/Active Releases/4.26/EOS-4.26.3M/EOS-4.26.3M.swi", + "compute_checksum": True, }, { - 'image': 'EOS', - 'version': '4.25.6M', - 'software': 'EOS', - 'filename': 'EOS-4.25.6M.swi', - 'expected_hash': 'md5sum', - 'remote_path': '/support/download/EOS-USA/Active Releases/4.25/EOS-4.25.6M/EOS-4.25.6M.swi', - 'compute_checksum': True + "image": "EOS", + "version": "4.25.6M", + "software": "EOS", + "filename": "EOS-4.25.6M.swi", + "expected_hash": "md5sum", + "remote_path": "/support/download/EOS-USA/Active Releases/4.25/EOS-4.25.6M/EOS-4.25.6M.swi", + "compute_checksum": True, }, { - 'image': 'vEOS-lab', - 'version': '4.25.6M', - 'software': 'EOS', - 'filename': 'vEOS-lab-4.25.6M.vmdk', - 'expected_hash': 'md5sum', - 'remote_path': '/support/download/EOS-USA/Active Releases/4.25/EOS-4.25.6M/vEOS-lab/vEOS-lab-4.25.6M.vmdk', - 'compute_checksum': False - } + "image": "vEOS-lab", + "version": "4.25.6M", + "software": "EOS", + "filename": "vEOS-lab-4.25.6M.vmdk", + "expected_hash": "md5sum", + "remote_path": "/support/download/EOS-USA/Active Releases/4.25/EOS-4.25.6M/vEOS-lab/vEOS-lab-4.25.6M.vmdk", + "compute_checksum": False, + }, ] eos_dataset_invalid = [ { - 'image': 'default', - 'version': '4.26.3M', - 'software': 'EOS', - 'filename': 'EOS-4.26.3M.swi', - 'expected_hash': 'sha512sum', - 'remote_path': '/support/download/EOS-USA/Active Releases/4.26/EOS-4.26.3M/EOS-4.26.3M.swi', - 'compute_checksum': True + "image": "default", + "version": "4.26.3M", + "software": "EOS", + "filename": "EOS-4.26.3M.swi", + "expected_hash": "sha512sum", + "remote_path": "/support/download/EOS-USA/Active Releases/4.26/EOS-4.26.3M/EOS-4.26.3M.swi", + "compute_checksum": True, } ] eos_version = [ { - 'version': 'EOS-4.23.1F', - 'is_valid': True, - 'major': 4, - 'minor': 23, - 'patch': 1, - 'rtype': 'F' + "version": "EOS-4.23.1F", + "is_valid": True, + "major": 4, + "minor": 23, + "patch": 1, + "rtype": "F", }, { - 'version': 'EOS-4.23.0', - 'is_valid': True, - 'major': 4, - 'minor': 23, - 'patch': 0, - 'rtype': None + "version": "EOS-4.23.0", + "is_valid": True, + "major": 4, + "minor": 23, + "patch": 0, + "rtype": None, }, { - 'version': 'EOS-4.23', - 'is_valid': True, - 'major': 4, - 'minor': 23, - 'patch': 0, - 'rtype': None + "version": "EOS-4.23", + "is_valid": True, + "major": 4, + "minor": 23, + "patch": 0, + "rtype": None, }, { - 'version': 'EOS-4.23.1M', - 'is_valid': True, - 'major': 4, - 'minor': 23, - 'patch': 1, - 'rtype': 'M' + "version": "EOS-4.23.1M", + "is_valid": True, + "major": 4, + "minor": 23, + "patch": 1, + "rtype": "M", }, { - 'version': 'EOS-4.23.1.F', - 'is_valid': True, - 'major': 4, - 'minor': 23, - 'patch': 1, - 'rtype': 'F' + "version": "EOS-4.23.1.F", + "is_valid": True, + "major": 4, + "minor": 23, + "patch": 1, + "rtype": "F", }, { - 'version': 'EOS-5.23.1F', - 'is_valid': False, - 'major': 4, - 'minor': 23, - 'patch': 1, - 'rtype': 'F' + "version": "EOS-5.23.1F", + "is_valid": False, + "major": 4, + "minor": 23, + "patch": 1, + "rtype": "F", }, -] \ No newline at end of file +] diff --git a/tests/lib/fixtures.py b/tests/lib/fixtures.py index 4515f9b..64b341b 100644 --- a/tests/lib/fixtures.py +++ b/tests/lib/fixtures.py @@ -5,13 +5,20 @@ # flake8: noqa: W503 # flake8: noqa: W1202 -from __future__ import (absolute_import, division, print_function) +from __future__ import absolute_import, division, print_function + import os +from typing import Any, Dict, List + import pytest -import eos_downloader -from typing import Dict, Any, List -from tests.lib.dataset import eos_dataset_valid, eos_dataset_invalid, eos_token, eos_token_invalid +import eos_downloader +from tests.lib.dataset import ( + eos_dataset_invalid, + eos_dataset_valid, + eos_token, + eos_token_invalid, +) @pytest.fixture @@ -19,17 +26,18 @@ from tests.lib.dataset import eos_dataset_valid, eos_dataset_invalid, eos_token, def create_download_instance(request, DOWNLOAD_INFO): # logger.info("Execute fixture to create class elements") request.cls.eos_downloader = eos_downloader.eos.EOSDownloader( - image=DOWNLOAD_INFO['image'], - software=DOWNLOAD_INFO['software'], - version=DOWNLOAD_INFO['version'], - token=eos_token, - hash_method='sha512sum') + image=DOWNLOAD_INFO["image"], + software=DOWNLOAD_INFO["software"], + version=DOWNLOAD_INFO["version"], + token=eos_token, + hash_method="sha512sum", + ) yield # logger.info('Cleanup test environment') - os.system('rm -f {}*'.format(DOWNLOAD_INFO['filename'])) + os.system("rm -f {}*".format(DOWNLOAD_INFO["filename"])) -def generate_test_ids_dict(val: Dict[str, Any], key: str = 'name') -> str: +def generate_test_ids_dict(val: Dict[str, Any], key: str = "name") -> str: """ generate_test_ids Helper to generate test ID for parametrize @@ -50,7 +58,8 @@ def generate_test_ids_dict(val: Dict[str, Any], key: str = 'name') -> str: return val[key] return "undefined_test" -def generate_test_ids_list(val: List[Dict[str, Any]], key: str = 'name') -> str: + +def generate_test_ids_list(val: List[Dict[str, Any]], key: str = "name") -> str: """ generate_test_ids Helper to generate test ID for parametrize @@ -66,4 +75,4 @@ def generate_test_ids_list(val: List[Dict[str, Any]], key: str = 'name') -> str: str Name of the configlet """ - return [ entry[key] if key in entry.keys() else 'unset_entry' for entry in val ] + return [entry[key] if key in entry.keys() else "unset_entry" for entry in val] diff --git a/tests/lib/helpers.py b/tests/lib/helpers.py index 308f2a5..67e914b 100644 --- a/tests/lib/helpers.py +++ b/tests/lib/helpers.py @@ -5,14 +5,13 @@ # flake8: noqa: W503 # flake8: noqa: W1202 -from __future__ import (absolute_import, division, print_function) +from __future__ import absolute_import, division, print_function import os from eos_downloader.data import DATA_MAPPING - def default_filename(version: str, info): """ default_filename Helper to build default filename @@ -31,10 +30,14 @@ def default_filename(version: str, info): """ if version is None or info is None: return None - return DATA_MAPPING[info['software']]['default']['prepend'] + '-' + version + '.swi' + return DATA_MAPPING[info["software"]]["default"]["prepend"] + "-" + version + ".swi" def is_on_github_actions(): """Check if code is running on a CI runner""" - if "CI" not in os.environ or not os.environ["CI"] or "GITHUB_RUN_ID" not in os.environ: - return False \ No newline at end of file + if ( + "CI" not in os.environ + or not os.environ["CI"] + or "GITHUB_RUN_ID" not in os.environ + ): + return False diff --git a/tests/system/test_eos_download.py.old b/tests/system/test_eos_download.py.old index 6ae56fe..91e60a5 100644 --- a/tests/system/test_eos_download.py.old +++ b/tests/system/test_eos_download.py.old @@ -45,4 +45,3 @@ class TestEosDownload_valid(): @pytest.mark.eos_download def test_download_local(self, DOWNLOAD_INFO): self.eos_downloader.download_local(file_path='.', checksum=DOWNLOAD_INFO['compute_checksum']) - diff --git a/tests/unit/test_eos_version.py b/tests/unit/test_eos_version.py index 1b97ffc..82f1269 100644 --- a/tests/unit/test_eos_version.py +++ b/tests/unit/test_eos_version.py @@ -5,126 +5,166 @@ # flake8: noqa: W503 # flake8: noqa: W1202 -from __future__ import (absolute_import, division, print_function) +from __future__ import absolute_import, division, print_function import sys -from loguru import logger + import pytest -from eos_downloader.models.version import EosVersion, BASE_VERSION_STR +from loguru import logger + +from eos_downloader.models.version import BASE_VERSION_STR, EosVersion from tests.lib.dataset import eos_version from tests.lib.fixtures import generate_test_ids_list logger.remove() logger.add(sys.stderr, level="DEBUG") -@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version')) + +@pytest.mark.parametrize( + "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version") +) def test_eos_version_from_str(EOS_VERSION): - version = EosVersion.from_str(EOS_VERSION['version']) - if EOS_VERSION['is_valid']: - assert version.major == EOS_VERSION['major'] - assert version.minor == EOS_VERSION['minor'] - assert version.patch == EOS_VERSION['patch'] - assert version.rtype == EOS_VERSION['rtype'] + version = EosVersion.from_str(EOS_VERSION["version"]) + if EOS_VERSION["is_valid"]: + assert version.major == EOS_VERSION["major"] + assert version.minor == EOS_VERSION["minor"] + assert version.patch == EOS_VERSION["patch"] + assert version.rtype == EOS_VERSION["rtype"] else: assert str(version) == BASE_VERSION_STR -@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version')) +@pytest.mark.parametrize( + "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version") +) def test_eos_version_to_str(EOS_VERSION): version = EosVersion(**EOS_VERSION) - if EOS_VERSION['is_valid']: - assert version.major == EOS_VERSION['major'] - assert version.minor == EOS_VERSION['minor'] - assert version.patch == EOS_VERSION['patch'] - assert version.rtype == EOS_VERSION['rtype'] + if EOS_VERSION["is_valid"]: + assert version.major == EOS_VERSION["major"] + assert version.minor == EOS_VERSION["minor"] + assert version.patch == EOS_VERSION["patch"] + assert version.rtype == EOS_VERSION["rtype"] + -@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version')) +@pytest.mark.parametrize( + "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version") +) def test_eos_version_branch(EOS_VERSION): - if EOS_VERSION['is_valid']: + if EOS_VERSION["is_valid"]: version = EosVersion(**EOS_VERSION) assert version.branch == f'{EOS_VERSION["major"]}.{EOS_VERSION["minor"]}' -@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version')) + +@pytest.mark.parametrize( + "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version") +) def test_eos_version_eq_operator(EOS_VERSION): - if not EOS_VERSION['is_valid']: - pytest.skip('not a valid version to test') + if not EOS_VERSION["is_valid"]: + pytest.skip("not a valid version to test") version = EosVersion(**EOS_VERSION) - logger.warning(f'version is: {version.dict()}') + logger.warning(f"version is: {version.dict()}") assert version == version -@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version')) + +@pytest.mark.parametrize( + "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version") +) def test_eos_version_ge_operator(EOS_VERSION): - if not EOS_VERSION['is_valid']: - pytest.skip('not a valid version to test') + if not EOS_VERSION["is_valid"]: + pytest.skip("not a valid version to test") version = EosVersion(**EOS_VERSION) version_b = EosVersion.from_str(BASE_VERSION_STR) assert version >= version_b -@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version')) + +@pytest.mark.parametrize( + "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version") +) def test_eos_version_gs_operator(EOS_VERSION): - if not EOS_VERSION['is_valid']: - pytest.skip('not a valid version to test') + if not EOS_VERSION["is_valid"]: + pytest.skip("not a valid version to test") version = EosVersion(**EOS_VERSION) version_b = EosVersion.from_str(BASE_VERSION_STR) assert version > version_b -@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version')) + +@pytest.mark.parametrize( + "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version") +) def test_eos_version_le_operator(EOS_VERSION): - if not EOS_VERSION['is_valid']: - pytest.skip('not a valid version to test') + if not EOS_VERSION["is_valid"]: + pytest.skip("not a valid version to test") version = EosVersion(**EOS_VERSION) version_b = EosVersion.from_str(BASE_VERSION_STR) assert version_b <= version -@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version')) + +@pytest.mark.parametrize( + "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version") +) def test_eos_version_ls_operator(EOS_VERSION): - if not EOS_VERSION['is_valid']: - pytest.skip('not a valid version to test') + if not EOS_VERSION["is_valid"]: + pytest.skip("not a valid version to test") version = EosVersion(**EOS_VERSION) version_b = EosVersion.from_str(BASE_VERSION_STR) assert version_b < version -@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version')) + +@pytest.mark.parametrize( + "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version") +) def test_eos_version_ne_operator(EOS_VERSION): - if not EOS_VERSION['is_valid']: - pytest.skip('not a valid version to test') + if not EOS_VERSION["is_valid"]: + pytest.skip("not a valid version to test") version = EosVersion(**EOS_VERSION) version_b = EosVersion.from_str(BASE_VERSION_STR) assert version_b != version -@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version')) + +@pytest.mark.parametrize( + "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version") +) def test_eos_version_match(EOS_VERSION): - if not EOS_VERSION['is_valid']: - pytest.skip('not a valid version to test') + if not EOS_VERSION["is_valid"]: + pytest.skip("not a valid version to test") version = EosVersion(**EOS_VERSION) assert version.match(f'=={EOS_VERSION["version"]}') - assert version.match(f'!={BASE_VERSION_STR}') - assert version.match(f'>={BASE_VERSION_STR}') - assert version.match(f'>{BASE_VERSION_STR}') - assert version.match('<=4.99.0F') - assert version.match('<4.99.0F') + assert version.match(f"!={BASE_VERSION_STR}") + assert version.match(f">={BASE_VERSION_STR}") + assert version.match(f">{BASE_VERSION_STR}") + assert version.match("<=4.99.0F") + assert version.match("<4.99.0F") + -@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version')) +@pytest.mark.parametrize( + "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version") +) def test_eos_version_is_in_branch(EOS_VERSION): - if not EOS_VERSION['is_valid']: - pytest.skip('not a valid version to test') + if not EOS_VERSION["is_valid"]: + pytest.skip("not a valid version to test") version = EosVersion(**EOS_VERSION) assert version.is_in_branch(f"{EOS_VERSION['major']}.{EOS_VERSION['minor']}") -@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version')) + +@pytest.mark.parametrize( + "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version") +) def test_eos_version_match_exception(EOS_VERSION): - if not EOS_VERSION['is_valid']: - pytest.skip('not a valid version to test') + if not EOS_VERSION["is_valid"]: + pytest.skip("not a valid version to test") with pytest.raises(Exception) as e_info: version = EosVersion(**EOS_VERSION) assert version.match(f'+={EOS_VERSION["version"]}') - logger.info(f'receive exception: {e_info}') + logger.info(f"receive exception: {e_info}") + -@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version')) +@pytest.mark.parametrize( + "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version") +) def test_eos_version_compare_exception(EOS_VERSION): - if not EOS_VERSION['is_valid']: - pytest.skip('not a valid version to test') + if not EOS_VERSION["is_valid"]: + pytest.skip("not a valid version to test") with pytest.raises(Exception) as e_info: version = EosVersion(**EOS_VERSION) version._compare(BASE_VERSION_STR) - logger.info(f'receive exception: {e_info}') + logger.info(f"receive exception: {e_info}") diff --git a/tests/unit/test_object_downloader.py b/tests/unit/test_object_downloader.py index 8237b1c..d178dfc 100644 --- a/tests/unit/test_object_downloader.py +++ b/tests/unit/test_object_downloader.py @@ -14,30 +14,41 @@ from loguru import logger import eos_downloader from eos_downloader.data import DATA_MAPPING from eos_downloader.eos import EOSDownloader -from tests.lib.dataset import eos_dataset_invalid, eos_dataset_valid, eos_token, eos_token_invalid +from tests.lib.dataset import ( + eos_dataset_invalid, + eos_dataset_valid, + eos_token, + eos_token_invalid, +) from tests.lib.fixtures import create_download_instance from tests.lib.helpers import default_filename, is_on_github_actions logger.remove() logger.add(sys.stderr, level="DEBUG") + @pytest.mark.usefixtures("create_download_instance") -@pytest.mark.parametrize("DOWNLOAD_INFO", eos_dataset_valid, ids=['EOS-sha512', 'EOS-md5' ,'vEOS-lab-no-hash']) +@pytest.mark.parametrize( + "DOWNLOAD_INFO", + eos_dataset_valid, + ids=["EOS-sha512", "EOS-md5", "vEOS-lab-no-hash"], +) @pytest.mark.eos_download -class TestEosDownload_valid(): +class TestEosDownload_valid: def test_data(self, DOWNLOAD_INFO): - logger.info(f'test input: {DOWNLOAD_INFO}') - logger.info(f'test build: {self.eos_downloader.__dict__}') + logger.info(f"test input: {DOWNLOAD_INFO}") + logger.info(f"test build: {self.eos_downloader.__dict__}") def test_eos_download_create(self, DOWNLOAD_INFO): my_download = eos_downloader.eos.EOSDownloader( - image=DOWNLOAD_INFO['image'], - software=DOWNLOAD_INFO['software'], - version=DOWNLOAD_INFO['version'], + image=DOWNLOAD_INFO["image"], + software=DOWNLOAD_INFO["software"], + version=DOWNLOAD_INFO["version"], token=eos_token, - hash_method='sha512sum') + hash_method="sha512sum", + ) logger.info(my_download) - assert isinstance(my_download, eos_downloader.eos.EOSDownloader) + assert isinstance(my_download, eos_downloader.eos.EOSDownloader) def test_eos_download_repr_string(self, DOWNLOAD_INFO): expected = f"{DOWNLOAD_INFO['software']} - {DOWNLOAD_INFO['image']} - {DOWNLOAD_INFO['version']}" @@ -45,47 +56,56 @@ class TestEosDownload_valid(): assert str(self.eos_downloader) == expected def test_eos_download_build_filename(self, DOWNLOAD_INFO): - assert self.eos_downloader._build_filename() == DOWNLOAD_INFO['filename'] + assert self.eos_downloader._build_filename() == DOWNLOAD_INFO["filename"] - @pytest.mark.dependency(name='authentication') - @pytest.mark.skipif(eos_token == eos_token_invalid, reason="Token is not set correctly") + @pytest.mark.dependency(name="authentication") + @pytest.mark.skipif( + eos_token == eos_token_invalid, reason="Token is not set correctly" + ) @pytest.mark.skipif(is_on_github_actions(), reason="Running on Github Runner") # @pytest.mark.xfail(reason="Deliberate - CI not set for testing AUTH") @pytest.mark.webtest def test_eos_download_authenticate(self): assert self.eos_downloader.authenticate() is True - @pytest.mark.dependency(depends=["authentication"], scope='class') + @pytest.mark.dependency(depends=["authentication"], scope="class") @pytest.mark.webtest def test_eos_download_get_remote_file_path(self, DOWNLOAD_INFO): - assert self.eos_downloader._get_remote_filepath() == DOWNLOAD_INFO['remote_path'] + assert ( + self.eos_downloader._get_remote_filepath() == DOWNLOAD_INFO["remote_path"] + ) - @pytest.mark.dependency(depends=["authentication"], scope='class') + @pytest.mark.dependency(depends=["authentication"], scope="class") @pytest.mark.webtest def test_eos_download_get_file_url(self, DOWNLOAD_INFO): - url = self.eos_downloader._get_url(remote_file_path = DOWNLOAD_INFO['remote_path']) + url = self.eos_downloader._get_url( + remote_file_path=DOWNLOAD_INFO["remote_path"] + ) logger.info(url) - assert 'https://downloads.arista.com/EOS-USA/Active%20Releases/' in url + assert "https://downloads.arista.com/EOS-USA/Active%20Releases/" in url -@pytest.mark.usefixtures("create_download_instance") -@pytest.mark.parametrize("DOWNLOAD_INFO", eos_dataset_invalid, ids=['EOS-FAKE']) -class TestEosDownload_invalid(): +@pytest.mark.usefixtures("create_download_instance") +@pytest.mark.parametrize("DOWNLOAD_INFO", eos_dataset_invalid, ids=["EOS-FAKE"]) +class TestEosDownload_invalid: def test_data(self, DOWNLOAD_INFO): - logger.info(f'test input: {dict(DOWNLOAD_INFO)}') - logger.info(f'test build: {self.eos_downloader.__dict__}') + logger.info(f"test input: {dict(DOWNLOAD_INFO)}") + logger.info(f"test build: {self.eos_downloader.__dict__}") def test_eos_download_login_error(self, DOWNLOAD_INFO): my_download = eos_downloader.eos.EOSDownloader( - image=DOWNLOAD_INFO['image'], - software=DOWNLOAD_INFO['software'], - version=DOWNLOAD_INFO['version'], + image=DOWNLOAD_INFO["image"], + software=DOWNLOAD_INFO["software"], + version=DOWNLOAD_INFO["version"], token=eos_token_invalid, - hash_method=DOWNLOAD_INFO['expected_hash']) + hash_method=DOWNLOAD_INFO["expected_hash"], + ) assert my_download.authenticate() is False - @pytest.mark.dependency(name='authentication') - @pytest.mark.skipif(eos_token == eos_token_invalid, reason="Token is not set correctly") + @pytest.mark.dependency(name="authentication") + @pytest.mark.skipif( + eos_token == eos_token_invalid, reason="Token is not set correctly" + ) @pytest.mark.skipif(is_on_github_actions(), reason="Running on Github Runner") # @pytest.mark.xfail(reason="Deliberate - CI not set for testing AUTH") @pytest.mark.webtest @@ -96,46 +116,48 @@ class TestEosDownload_invalid(): # @pytest.mark.skip(reason="Not yet implemented in lib") def test_eos_file_name_with_incorrect_software(self, DOWNLOAD_INFO): - self.eos_downloader.software = 'FAKE' - logger.info(f'test build: {self.eos_downloader.__dict__}') + self.eos_downloader.software = "FAKE" + logger.info(f"test build: {self.eos_downloader.__dict__}") with pytest.raises(ValueError) as e_info: result = self.eos_downloader._build_filename() - logger.info(f'receive exception: {e_info}') - self.eos_downloader.software = DOWNLOAD_INFO['software'] + logger.info(f"receive exception: {e_info}") + self.eos_downloader.software = DOWNLOAD_INFO["software"] @pytest.mark.webtest - @pytest.mark.dependency(depends=["authentication"], scope='class') - def test_eos_download_get_remote_file_path_for_invlaid_software(self, DOWNLOAD_INFO): - self.eos_downloader.software = 'FAKE' - logger.info(f'Platform set to: {self.eos_downloader.software}') - logger.info(f'test build: {self.eos_downloader.__dict__}') + @pytest.mark.dependency(depends=["authentication"], scope="class") + def test_eos_download_get_remote_file_path_for_invlaid_software( + self, DOWNLOAD_INFO + ): + self.eos_downloader.software = "FAKE" + logger.info(f"Platform set to: {self.eos_downloader.software}") + logger.info(f"test build: {self.eos_downloader.__dict__}") with pytest.raises(ValueError) as e_info: result = self.eos_downloader._build_filename() - logger.info(f'receive exception: {e_info}') - self.eos_downloader.software = DOWNLOAD_INFO['software'] + logger.info(f"receive exception: {e_info}") + self.eos_downloader.software = DOWNLOAD_INFO["software"] # IMAGE TESTING def test_eos_file_name_with_incorrect_image(self, DOWNLOAD_INFO): - self.eos_downloader.image = 'FAKE' - logger.info(f'Image set to: {self.eos_downloader.image}') - assert DOWNLOAD_INFO['filename'] == self.eos_downloader._build_filename() - self.eos_downloader.software == DOWNLOAD_INFO['image'] + self.eos_downloader.image = "FAKE" + logger.info(f"Image set to: {self.eos_downloader.image}") + assert DOWNLOAD_INFO["filename"] == self.eos_downloader._build_filename() + self.eos_downloader.software == DOWNLOAD_INFO["image"] @pytest.mark.webtest - @pytest.mark.dependency(depends=["authentication"], scope='class') + @pytest.mark.dependency(depends=["authentication"], scope="class") def test_eos_download_get_remote_file_path_for_invlaid_image(self, DOWNLOAD_INFO): - self.eos_downloader.image = 'FAKE' - logger.info(f'Image set to: {self.eos_downloader.image}') + self.eos_downloader.image = "FAKE" + logger.info(f"Image set to: {self.eos_downloader.image}") assert self.eos_downloader.authenticate() is True - assert DOWNLOAD_INFO['filename'] == self.eos_downloader._build_filename() - self.eos_downloader.image = DOWNLOAD_INFO['image'] + assert DOWNLOAD_INFO["filename"] == self.eos_downloader._build_filename() + self.eos_downloader.image = DOWNLOAD_INFO["image"] # VERSION TESTING @pytest.mark.webtest - @pytest.mark.dependency(depends=["authentication"], scope='class') + @pytest.mark.dependency(depends=["authentication"], scope="class") def test_eos_download_get_remote_file_path_for_invlaid_version(self, DOWNLOAD_INFO): - self.eos_downloader.version = 'FAKE' - logger.info(f'Version set to: {self.eos_downloader.version}') - assert self.eos_downloader._get_remote_filepath() == '' \ No newline at end of file + self.eos_downloader.version = "FAKE" + logger.info(f"Version set to: {self.eos_downloader.version}") + assert self.eos_downloader._get_remote_filepath() == "" -- cgit v1.2.3