summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/on_demand.yml20
-rw-r--r--.github/workflows/pr-management.yml22
-rw-r--r--.github/workflows/release.yml24
-rw-r--r--.pre-commit-config.yaml64
-rw-r--r--README.md31
-rw-r--r--bin/README.md111
-rwxr-xr-xbin/cvp-upload56
-rwxr-xr-xbin/eos-download86
-rw-r--r--eos_downloader/__init__.py27
-rw-r--r--eos_downloader/cli/cli.py48
-rw-r--r--eos_downloader/cli/debug/commands.py41
-rw-r--r--eos_downloader/cli/get/commands.py211
-rw-r--r--eos_downloader/cli/info/commands.py86
-rw-r--r--eos_downloader/cli/utils.py38
-rw-r--r--eos_downloader/cvp.py85
-rw-r--r--eos_downloader/data.py92
-rw-r--r--eos_downloader/download.py33
-rw-r--r--eos_downloader/eos.py68
-rw-r--r--eos_downloader/models/version.py71
-rw-r--r--eos_downloader/object_downloader.py223
-rw-r--r--pylintrc3
-rw-r--r--pyproject.toml8
-rw-r--r--tests/lib/dataset.py143
-rw-r--r--tests/lib/fixtures.py35
-rw-r--r--tests/lib/helpers.py13
-rw-r--r--tests/system/test_eos_download.py.old1
-rw-r--r--tests/unit/test_eos_version.py152
-rw-r--r--tests/unit/test_object_downloader.py128
28 files changed, 1067 insertions, 853 deletions
diff --git a/.github/workflows/on_demand.yml b/.github/workflows/on_demand.yml
index 1e41be2..17223f9 100644
--- a/.github/workflows/on_demand.yml
+++ b/.github/workflows/on_demand.yml
@@ -14,11 +14,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
- name: Docker meta for TAG
id: meta
- uses: docker/metadata-action@v4
+ uses: docker/metadata-action@v5
with:
images: |
${{ secrets.DOCKER_IMAGE }}
@@ -27,20 +27,20 @@ jobs:
type=raw,value=${{ inputs.tag }}
- name: Login to DockerHub
- uses: docker/login-action@v2
+ uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to GitHub Container Registry
- uses: docker/login-action@v2
+ uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push
- uses: docker/build-push-action@v4
+ uses: docker/build-push-action@v5
with:
context: .
file: Dockerfile
@@ -54,11 +54,11 @@ jobs:
needs: [docker]
steps:
- name: Checkout
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
- name: Docker meta for TAG
id: meta
- uses: docker/metadata-action@v4
+ uses: docker/metadata-action@v5
with:
images: |
${{ secrets.DOCKER_IMAGE }}
@@ -67,20 +67,20 @@ jobs:
type=raw,value=${{ inputs.tag }}-dind
- name: Login to DockerHub
- uses: docker/login-action@v2
+ uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to GitHub Container Registry
- uses: docker/login-action@v2
+ uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push
- uses: docker/build-push-action@v4
+ uses: docker/build-push-action@v5
with:
context: .
file: Dockerfile.docker
diff --git a/.github/workflows/pr-management.yml b/.github/workflows/pr-management.yml
index 233166e..dc30ccf 100644
--- a/.github/workflows/pr-management.yml
+++ b/.github/workflows/pr-management.yml
@@ -8,15 +8,29 @@ on:
types: [assigned, opened, synchronize, reopened]
jobs:
+
+ pre-commit:
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ python-version: ["3.8", "3.9", "3.10"]
+ steps:
+ - uses: actions/checkout@v4
+ - uses: actions/setup-python@v4
+ with:
+ python-version: ${{ matrix.python-version }}
+ - uses: pre-commit-ci/lite-action@v1.0.1
+
compiling:
name: Run installation process and code compilation supported Python versions
runs-on: ubuntu-latest
+ needs: [pre-commit]
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10"]
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
@@ -44,7 +58,7 @@ jobs:
python: ["3.8", "3.9", "3.10"]
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v4
@@ -67,7 +81,7 @@ jobs:
python: ["3.8", "3.9", "3.10"]
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v4
@@ -90,7 +104,7 @@ jobs:
python: ["3.8", "3.9", "3.10"]
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v4
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index 622a85d..e148e6b 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -11,7 +11,7 @@ jobs:
# runs-on: ubuntu-latest
# steps:
# - name: Checkout code
- # uses: actions/checkout@v3
+ # uses: actions/checkout@v4
# with:
# fetch-depth: 0
@@ -36,7 +36,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
- name: Install dependencies
run: |
@@ -59,11 +59,11 @@ jobs:
needs: [pypi]
steps:
- name: Checkout
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
- name: Docker meta for TAG
id: meta
- uses: docker/metadata-action@v4
+ uses: docker/metadata-action@v5
with:
images: |
${{ secrets.DOCKER_IMAGE }}
@@ -73,20 +73,20 @@ jobs:
type=raw,value=latest
- name: Login to DockerHub
- uses: docker/login-action@v2
+ uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to GitHub Container Registry
- uses: docker/login-action@v2
+ uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push
- uses: docker/build-push-action@v4
+ uses: docker/build-push-action@v5
with:
context: .
file: Dockerfile
@@ -100,11 +100,11 @@ jobs:
needs: [docker]
steps:
- name: Checkout
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
- name: Docker meta for TAG
id: meta
- uses: docker/metadata-action@v4
+ uses: docker/metadata-action@v5
with:
images: |
${{ secrets.DOCKER_IMAGE }}
@@ -114,20 +114,20 @@ jobs:
type=raw,value=latest-dind
- name: Login to DockerHub
- uses: docker/login-action@v2
+ uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to GitHub Container Registry
- uses: docker/login-action@v2
+ uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push
- uses: docker/build-push-action@v4
+ uses: docker/build-push-action@v5
with:
context: .
file: Dockerfile.docker
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
new file mode 100644
index 0000000..513a2cc
--- /dev/null
+++ b/.pre-commit-config.yaml
@@ -0,0 +1,64 @@
+---
+# See https://pre-commit.com for more information
+# See https://pre-commit.com/hooks.html for more hooks
+files: ^(eos_downloader)/
+
+repos:
+ - repo: https://github.com/pre-commit/pre-commit-hooks
+ rev: v4.4.0
+ hooks:
+ - id: trailing-whitespace
+ - id: end-of-file-fixer
+ - id: check-added-large-files
+ - id: check-merge-conflict
+
+ # - repo: https://github.com/pycqa/isort
+ # rev: 5.12.0
+ # hooks:
+ # - id: isort
+ # name: Check for changes when running isort on all python files
+
+ - repo: https://github.com/psf/black
+ rev: 23.7.0
+ hooks:
+ - id: black
+ name: Check for changes when running Black on all python files
+
+ - repo: https://github.com/pycqa/flake8
+ rev: 6.0.0
+ hooks:
+ - id: flake8
+ name: Check for PEP8 error on Python files
+ args:
+ - --config=/dev/null
+ - --max-line-length=165
+
+ - repo: local # as per https://pylint.pycqa.org/en/latest/user_guide/installation/pre-commit-integration.html
+ hooks:
+ - id: pylint
+ entry: pylint
+ language: python
+ name: Check for Linting error on Python files
+ description: This hook runs pylint.
+ types: [python]
+ args:
+ - -rn # Only display messages
+ - -sn # Don't display the score
+ - --rcfile=pylintrc # Link to config file
+
+ - repo: https://github.com/pre-commit/mirrors-mypy
+ rev: v1.4.1
+ hooks:
+ - id: mypy
+ args:
+ - --config-file=pyproject.toml
+ additional_dependencies:
+ - "click==8.1.3"
+ - "click-help-colors==0.9.1"
+ - "pydantic~=2.0"
+ - "PyYAML==6.0"
+ - "requests>=2.27"
+ - "rich~=13.4"
+ - types-paramiko
+ - types-requests
+ files: eos_downloader
diff --git a/README.md b/README.md
index b5abbc3..0809624 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,13 @@
- [![code-testing](https://github.com/titom73/eos-downloader/actions/workflows/pr-management.yml/badge.svg?event=push)](https://github.com/titom73/eos-downloader/actions/workflows/pr-management.yml) ![PyPI - Python Version](https://img.shields.io/pypi/pyversions/eos-downloader) ![GitHub release (latest SemVer)](https://img.shields.io/github/v/release/titom73/arista-downloader) ![PyPI - Downloads/month](https://img.shields.io/pypi/dm/eos-downloader)
+[![tests](https://github.com/titom73/eos-downloader/actions/workflows/pr-management.yml/badge.svg?event=push)](https://github.com/titom73/eos-downloader/actions/workflows/pr-management.yml)
+![PyPI - Python Version](https://img.shields.io/pypi/pyversions/eos-downloader)
+[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
+![Checked with mypy](http://www.mypy-lang.org/static/mypy_badge.svg)
+![GitHub release](https://img.shields.io/github/v/release/titom73/arista-downloader)
+![PyPI - Downloads/month](https://img.shields.io/pypi/dm/eos-downloader)
+
+<!--
+[![pre-commit](https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit&logoColor=white)](https://github.com/pre-commit/pre-commit)
+!-->
# Arista Software Downloader
@@ -19,6 +28,7 @@ Usage: ardl [OPTIONS] COMMAND [ARGS]...
Arista Network Download CLI
Options:
+ --version Show the version and exit.
--token TEXT Arista Token from your customer account [env var:
ARISTA_TOKEN]
--help Show this message and exit.
@@ -26,7 +36,6 @@ Options:
Commands:
debug Debug commands to work with ardl
get Download Arista from Arista website
- version Display version of ardl
```
> **Warning**
@@ -35,10 +44,22 @@ Commands:
### Download EOS Package
-
+> **Note**
> Supported packages are: EOS, cEOS, vEOS-lab, cEOS64
-You can download EOS packages with following commands:
+CLI gives an option to get latest version available. By default it takes latest `F` release
+
+```bash
+ardl get eos --image-type cEOS --latest
+```
+
+If you want to get latest M release, you can use `--release-type`:
+
+```bash
+ardl get eos --image-type cEOS --release-type M --latest
+```
+
+You can download a specific EOS packages with following commands:
```bash
# Example for a cEOS package
@@ -164,7 +185,7 @@ tqdm
On EVE-NG, you may have to install/upgrade __pyOpenSSL__ in version `23.0.0`:
-```
+```bash
# Error when running ardl: AttributeError: module 'lib' has no attribute 'X509_V_FLAG_CB_ISSUER_CHECK'
$ pip install pyopenssl --upgrade
diff --git a/bin/README.md b/bin/README.md
deleted file mode 100644
index 7509633..0000000
--- a/bin/README.md
+++ /dev/null
@@ -1,111 +0,0 @@
-## scripts
-
-These scripts are deprecated and will be removed in a futur version. Please prefer the use of the CLI implemented in the package.
-
-### eos-download
-
-```bash
-usage: eos-download [-h]
- --version VERSION
- [--token TOKEN]
- [--image IMAGE]
- [--destination DESTINATION]
- [--eve]
- [--noztp]
- [--import_docker]
- [--docker_name DOCKER_NAME]
- [--verbose VERBOSE]
- [--log]
-
-EOS downloader script.
-
-optional arguments:
- -h, --help show this help message and exit
- --token TOKEN arista.com user API key - can use ENV:ARISTA_TOKEN
- --image IMAGE Type of EOS image required
- --version VERSION EOS version to download from website
- --destination DESTINATION
- Path where to save EOS package downloaded
- --eve Option to install EOS package to EVE-NG
- --noztp Option to deactivate ZTP when used with EVE-NG
- --import_docker Option to import cEOS image to docker
- --docker_name DOCKER_NAME
- Docker image name to use
- --verbose VERBOSE Script verbosity
- --log Option to activate logging to eos-downloader.log file
-```
-
-- Token are read from `ENV:ARISTA_TOKEN` unless you specify a specific token with CLI.
-
-- Supported platforms:
-
- - `INT`: International version
- - `64`: 64 bits version
- - `2GB` for 2GB flash platform
- - `2GB-INT`: for 2GB running International
- - `vEOS`: Virtual EOS image
- - `vEOS-lab`: Virtual Lab EOS
- - `vEOS64-lab`: Virtual Lab EOS running 64B
- - `cEOS`: Docker version of EOS
- - `cEOS64`: Docker version of EOS running in 64 bits
-
-#### Examples
-
-- Download vEOS-lab image and install in EVE-NG
-
-```bash
-$ eos-download --image vEOS-lab --version 4.25.7M --eve --noztp
-```
-
-- Download Docker image
-
-```bash
-$ eos-download --image cEOS --version 4.27.1F
-šŸŖ eos-downloader is starting...
- - Image Type: cEOS
- - Version: 4.27.2F
-āœ… Authenticated on arista.com
-šŸ”Ž Searching file cEOS-lab-4.27.2F.tar.xz
- -> Found file at /support/download/EOS-USA/Active Releases/4.27/EOS-4.27.2F/cEOS-lab/cEOS-lab-4.27.2F.tar.xz
-šŸ’¾ Downloading cEOS-lab-4.27.2F.tar.xz ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā” 100.0% ā€¢ 17.1 MB/s ā€¢ 451.6/451.6 MB ā€¢ 0:00:19 ā€¢
-šŸš€ Running checksum validation
-šŸ”Ž Searching file cEOS-lab-4.27.2F.tar.xz.sha512sum
- -> Found file at /support/download/EOS-USA/Active
-Releases/4.27/EOS-4.27.2F/cEOS-lab/cEOS-lab-4.27.2F.tar.xz.sha512sum
-šŸ’¾ Downloading cEOS-lab-4.27.2F.tar.xz.sha512sum ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā”ā” 100.0% ā€¢ ? ā€¢ 154/154 bytes ā€¢ 0:00:00 ā€¢
-āœ… Downloaded file is correct.
-```
-
-__Note:__ `ARISTA_TOKEN` should be set in your .profile and not set for each command. If not set, you can use `--token` knob.
-
-```bash
-# Export Token
-export ARISTA_TOKEN="xxxxxxx"
-```
-
-### Cloudvision Image uploader
-
-Create an image bundle on Cloudvision.
-
-```bash
-cvp-upload -h
-usage: cvp-upload [-h]
- [--token TOKEN]
- [--image IMAGE]
- --cloudvision CLOUDVISION
- [--create_bundle]
- [--timeout TIMEOUT]
- [--verbose VERBOSE]
-
-Cloudvision Image uploader script.
-
-optional arguments:
- -h, --help show this help message and exit
- --token TOKEN CVP Authentication token - can use ENV:ARISTA_AVD_CV_TOKEN
- --image IMAGE Type of EOS image required
- --cloudvision CLOUDVISION
- Cloudvision instance where to upload image
- --create_bundle Option to create image bundle with new uploaded image
- --timeout TIMEOUT Timeout connection. Default is set to 1200sec
- --verbose VERBOSE Script verbosity
-``` \ No newline at end of file
diff --git a/bin/cvp-upload b/bin/cvp-upload
deleted file mode 100755
index 74213fe..0000000
--- a/bin/cvp-upload
+++ /dev/null
@@ -1,56 +0,0 @@
-#!/usr/bin/python
-
-import sys
-import os
-import argparse
-from eos_downloader.cvp import CvFeatureManager, CvpAuthenticationItem
-from loguru import logger
-
-ARISTA_AVD_CV_TOKEN = os.getenv('ARISTA_AVD_CV_TOKEN', '')
-
-
-def read_cli():
- parser = argparse.ArgumentParser(description='Cloudvision Image uploader script.')
- parser.add_argument('--token', required=False,
- default=ARISTA_AVD_CV_TOKEN,
- help='CVP Authentication token - can use ENV:ARISTA_AVD_CV_TOKEN')
- parser.add_argument('--image', required=False,
- default='EOS', help='Type of EOS image required')
- parser.add_argument('--cloudvision', required=True,
- help='Cloudvision instance where to upload image')
- parser.add_argument('--create_bundle', required=False, action='store_true',
- help="Option to create image bundle with new uploaded image")
- parser.add_argument('--timeout', required=False,
- default=1200,
- help='Timeout connection. Default is set to 1200sec')
- parser.add_argument('--verbose', required=False,
- default='info', help='Script verbosity')
- return parser.parse_args()
-
-
-if __name__ == '__main__':
-
- cli_options = read_cli()
-
- logger.remove()
- logger.add(sys.stderr, level=str(cli_options.verbose).upper())
-
- cv_authentication = CvpAuthenticationItem(
- server=cli_options.cloudvision,
- token=cli_options.token,
- port=443,
- timeout=cli_options.timeout,
- validate_cert=False
- )
-
- my_cvp_uploader = CvFeatureManager(authentication=cv_authentication)
- result_upload = my_cvp_uploader.upload_image(cli_options.image)
- if result_upload and cli_options.create_bundle:
- bundle_name = os.path.basename(cli_options.image)
- logger.info('Creating image bundle {}'.format(bundle_name))
- my_cvp_uploader.create_bundle(
- name=bundle_name,
- images_name=[bundle_name]
- )
-
- sys.exit(0)
diff --git a/bin/eos-download b/bin/eos-download
deleted file mode 100755
index 9826b31..0000000
--- a/bin/eos-download
+++ /dev/null
@@ -1,86 +0,0 @@
-#!/usr/bin/python
-
-import sys
-import os
-import argparse
-import eos_downloader.eos
-from loguru import logger
-from rich.console import Console
-
-ARISTA_TOKEN = os.getenv('ARISTA_TOKEN', '')
-
-
-def read_cli():
- parser = argparse.ArgumentParser(description='EOS downloader script.')
- parser.add_argument('--token', required=False,
- default=ARISTA_TOKEN,
- help='arista.com user API key - can use ENV:ARISTA_TOKEN')
- parser.add_argument('--image', required=False,
- default='EOS', help='Type of EOS image required')
- parser.add_argument('--version', required=True,
- default='', help='EOS version to download from website')
-
- parser.add_argument('--destination', required=False,
- default=str(os.getcwd()),
- help='Path where to save EOS package downloaded')
-
- parser.add_argument('--eve', required=False, action='store_true',
- help="Option to install EOS package to EVE-NG")
- parser.add_argument('--noztp', required=False, action='store_true',
- help="Option to deactivate ZTP when used with EVE-NG")
-
- parser.add_argument('--import_docker', required=False, action='store_true',
- help="Option to import cEOS image to docker")
- parser.add_argument('--docker_name', required=False,
- default='arista/ceos',
- help='Docker image name to use')
-
- parser.add_argument('--verbose', required=False,
- default='info', help='Script verbosity')
- parser.add_argument('--log', required=False, action='store_true',
- help="Option to activate logging to eos-downloader.log file")
-
- return parser.parse_args()
-
-
-if __name__ == '__main__':
-
- cli_options = read_cli()
-
- console = Console()
-
- console.print('\n[red]WARNING: This script is now deprecated. Please use ardl cli instead[/red]\n\n')
-
- if cli_options.token is None or cli_options.token == '':
- console.print('\nā— Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red")
- sys.exit(1)
-
- logger.remove()
- if cli_options.log:
- logger.add("eos-downloader.log", rotation="10 MB", level=str(cli_options.verbose).upper())
-
- console.print("šŸŖ [bold blue]eos-downloader[/bold blue] is starting...", )
- console.print(f' - Image Type: {cli_options.image}')
- console.print(f' - Version: {cli_options.version}')
-
-
- my_download = eos_downloader.eos.EOSDownloader(
- image=cli_options.image,
- software='EOS',
- version=cli_options.version,
- token=cli_options.token,
- hash_method='sha512sum')
-
- my_download.authenticate()
-
- if cli_options.eve:
- my_download.provision_eve(noztp=cli_options.noztp, checksum=True)
- else:
- my_download.download_local(file_path=cli_options.destination, checksum=True)
-
- if cli_options.import_docker:
- my_download.docker_import(
- image_name=cli_options.docker_name
- )
- console.print('āœ… processing done !')
- sys.exit(0)
diff --git a/eos_downloader/__init__.py b/eos_downloader/__init__.py
index 345ccf7..4507a70 100644
--- a/eos_downloader/__init__.py
+++ b/eos_downloader/__init__.py
@@ -5,23 +5,31 @@
EOS Downloader module.
"""
-from __future__ import (absolute_import, division,
- print_function, unicode_literals, annotations)
+from __future__ import (
+ absolute_import,
+ annotations,
+ division,
+ print_function,
+ unicode_literals,
+)
+
import dataclasses
-from typing import Any
-import json
import importlib.metadata
+import json
+from typing import Any
-__author__ = '@titom73'
-__email__ = 'tom@inetsix.net'
-__date__ = '2022-03-16'
+__author__ = "@titom73"
+__email__ = "tom@inetsix.net"
+__date__ = "2022-03-16"
__version__ = importlib.metadata.version("eos-downloader")
# __all__ = ["CvpAuthenticationItem", "CvFeatureManager", "EOSDownloader", "ObjectDownloader", "reverse"]
ARISTA_GET_SESSION = "https://www.arista.com/custom_data/api/cvp/getSessionCode/"
-ARISTA_SOFTWARE_FOLDER_TREE = "https://www.arista.com/custom_data/api/cvp/getFolderTree/"
+ARISTA_SOFTWARE_FOLDER_TREE = (
+ "https://www.arista.com/custom_data/api/cvp/getFolderTree/"
+)
ARISTA_DOWNLOAD_URL = "https://www.arista.com/custom_data/api/cvp/getDownloadLink/"
@@ -36,11 +44,12 @@ check the Access Token. Then re-run the script with the correct token.
MSG_INVALID_DATA = """Invalid data returned by server
"""
-EVE_QEMU_FOLDER_PATH = '/opt/unetlab/addons/qemu/'
+EVE_QEMU_FOLDER_PATH = "/opt/unetlab/addons/qemu/"
class EnhancedJSONEncoder(json.JSONEncoder):
"""Custom JSon encoder."""
+
def default(self, o: Any) -> Any:
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
diff --git a/eos_downloader/cli/cli.py b/eos_downloader/cli/cli.py
index ddd0dea..ad77f2b 100644
--- a/eos_downloader/cli/cli.py
+++ b/eos_downloader/cli/cli.py
@@ -11,49 +11,51 @@ ARDL CLI Baseline.
"""
import click
-from rich.console import Console
-import eos_downloader
-from eos_downloader.cli.get import commands as get_commands
+
+from eos_downloader import __version__
from eos_downloader.cli.debug import commands as debug_commands
+from eos_downloader.cli.get import commands as get_commands
from eos_downloader.cli.info import commands as info_commands
+from eos_downloader.cli.utils import AliasedGroup
+
-@click.group()
+@click.group(cls=AliasedGroup)
+@click.version_option(__version__)
@click.pass_context
-@click.option('--token', show_envvar=True, default=None, help='Arista Token from your customer account')
+@click.option(
+ "--token",
+ show_envvar=True,
+ default=None,
+ help="Arista Token from your customer account",
+)
def ardl(ctx: click.Context, token: str) -> None:
"""Arista Network Download CLI"""
ctx.ensure_object(dict)
- ctx.obj['token'] = token
+ ctx.obj["token"] = token
-@click.command()
-def version() -> None:
- """Display version of ardl"""
- console = Console()
- console.print(f'ardl is running version {eos_downloader.__version__}')
-
-
-@ardl.group(no_args_is_help=True)
+@ardl.group(cls=AliasedGroup, no_args_is_help=True)
@click.pass_context
-def get(ctx: click.Context) -> None:
+def get(ctx: click.Context, cls: click.Group = AliasedGroup) -> None:
# pylint: disable=redefined-builtin
"""Download Arista from Arista website"""
-@ardl.group(no_args_is_help=True)
+@ardl.group(cls=AliasedGroup, no_args_is_help=True)
@click.pass_context
-def info(ctx: click.Context) -> None:
+def info(ctx: click.Context, cls: click.Group = AliasedGroup) -> None:
# pylint: disable=redefined-builtin
"""List information from Arista website"""
-@ardl.group(no_args_is_help=True)
+@ardl.group(cls=AliasedGroup, no_args_is_help=True)
@click.pass_context
-def debug(ctx: click.Context) -> None:
+def debug(ctx: click.Context, cls: click.Group = AliasedGroup) -> None:
# pylint: disable=redefined-builtin
"""Debug commands to work with ardl"""
+
# ANTA CLI Execution
@@ -64,13 +66,9 @@ def cli() -> None:
get.add_command(get_commands.cvp)
info.add_command(info_commands.eos_versions)
debug.add_command(debug_commands.xml)
- ardl.add_command(version)
# Load CLI
- ardl(
- obj={},
- auto_envvar_prefix='arista'
- )
+ ardl(obj={}, auto_envvar_prefix="arista")
-if __name__ == '__main__':
+if __name__ == "__main__":
cli()
diff --git a/eos_downloader/cli/debug/commands.py b/eos_downloader/cli/debug/commands.py
index 107b8a0..5a0d7f8 100644
--- a/eos_downloader/cli/debug/commands.py
+++ b/eos_downloader/cli/debug/commands.py
@@ -22,32 +22,51 @@ import eos_downloader.eos
@click.command()
@click.pass_context
-@click.option('--output', default=str('arista.xml'), help='Path to save XML file', type=click.Path(), show_default=True)
-@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False))
+@click.option(
+ "--output",
+ default=str("arista.xml"),
+ help="Path to save XML file",
+ type=click.Path(),
+ show_default=True,
+)
+@click.option(
+ "--log-level",
+ "--log",
+ help="Logging level of the command",
+ default=None,
+ type=click.Choice(
+ ["debug", "info", "warning", "error", "critical"], case_sensitive=False
+ ),
+)
def xml(ctx: click.Context, output: str, log_level: str) -> None:
# sourcery skip: remove-unnecessary-cast
"""Extract XML directory structure"""
console = Console()
# Get from Context
- token = ctx.obj['token']
+ token = ctx.obj["token"]
logger.remove()
if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
my_download = eos_downloader.eos.EOSDownloader(
- image='unset',
- software='EOS',
- version='unset',
+ image="unset",
+ software="EOS",
+ version="unset",
token=token,
- hash_method='sha512sum')
+ hash_method="sha512sum",
+ )
my_download.authenticate()
- xml_object: ET.ElementTree = my_download._get_folder_tree() # pylint: disable=protected-access
+ xml_object: ET.ElementTree = (
+ my_download.get_folder_tree()
+ ) # pylint: disable=protected-access
xml_content = xml_object.getroot()
- xmlstr = minidom.parseString(ET.tostring(xml_content)).toprettyxml(indent=" ", newl='')
- with open(output, "w", encoding='utf-8') as f:
+ xmlstr = minidom.parseString(ET.tostring(xml_content)).toprettyxml(
+ indent=" ", newl=""
+ )
+ with open(output, "w", encoding="utf-8") as f:
f.write(str(xmlstr))
- console.print(f'XML file saved in: { output }')
+ console.print(f"XML file saved in: { output }")
diff --git a/eos_downloader/cli/get/commands.py b/eos_downloader/cli/get/commands.py
index 13a8eec..b4525fe 100644
--- a/eos_downloader/cli/get/commands.py
+++ b/eos_downloader/cli/get/commands.py
@@ -21,68 +21,156 @@ from rich.console import Console
import eos_downloader.eos
from eos_downloader.models.version import BASE_VERSION_STR, RTYPE_FEATURE, RTYPES
-EOS_IMAGE_TYPE = ['64', 'INT', '2GB-INT', 'cEOS', 'cEOS64', 'vEOS', 'vEOS-lab', 'EOS-2GB', 'default']
-CVP_IMAGE_TYPE = ['ova', 'rpm', 'kvm', 'upgrade']
+EOS_IMAGE_TYPE = [
+ "64",
+ "INT",
+ "2GB-INT",
+ "cEOS",
+ "cEOS64",
+ "vEOS",
+ "vEOS-lab",
+ "EOS-2GB",
+ "default",
+]
+CVP_IMAGE_TYPE = ["ova", "rpm", "kvm", "upgrade"]
+
@click.command(no_args_is_help=True)
@click.pass_context
-@click.option('--image-type', default='default', help='EOS Image type', type=click.Choice(EOS_IMAGE_TYPE), required=True)
-@click.option('--version', default=None, help='EOS version', type=str, required=False)
-@click.option('--latest', '-l', is_flag=True, type=click.BOOL, default=False, help='Get latest version in given branch. If --branch is not use, get the latest branch with specific release type')
-@click.option('--release-type', '-rtype', type=click.Choice(RTYPES, case_sensitive=False), default=RTYPE_FEATURE, help='EOS release type to search')
-@click.option('--branch', '-b', type=click.STRING, default=None, help='EOS Branch to list releases')
-@click.option('--docker-name', default='arista/ceos', help='Docker image name (default: arista/ceos)', type=str, show_default=True)
-@click.option('--output', default=str(os.path.relpath(os.getcwd(), start=os.curdir)), help='Path to save image', type=click.Path(),show_default=True)
+@click.option(
+ "--image-type",
+ default="default",
+ help="EOS Image type",
+ type=click.Choice(EOS_IMAGE_TYPE),
+ required=True,
+)
+@click.option("--version", default=None, help="EOS version", type=str, required=False)
+@click.option(
+ "--latest",
+ "-l",
+ is_flag=True,
+ type=click.BOOL,
+ default=False,
+ help="Get latest version in given branch. If --branch is not use, get the latest branch with specific release type",
+)
+@click.option(
+ "--release-type",
+ "-rtype",
+ type=click.Choice(RTYPES, case_sensitive=False),
+ default=RTYPE_FEATURE,
+ help="EOS release type to search",
+)
+@click.option(
+ "--branch",
+ "-b",
+ type=click.STRING,
+ default=None,
+ help="EOS Branch to list releases",
+)
+@click.option(
+ "--docker-name",
+ default="arista/ceos",
+ help="Docker image name (default: arista/ceos)",
+ type=str,
+ show_default=True,
+)
+@click.option(
+ "--output",
+ default=str(os.path.relpath(os.getcwd(), start=os.curdir)),
+ help="Path to save image",
+ type=click.Path(),
+ show_default=True,
+)
# Debugging
-@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False))
+@click.option(
+ "--log-level",
+ "--log",
+ help="Logging level of the command",
+ default=None,
+ type=click.Choice(
+ ["debug", "info", "warning", "error", "critical"], case_sensitive=False
+ ),
+)
# Boolean triggers
-@click.option('--eve-ng', is_flag=True, help='Run EVE-NG vEOS provisioning (only if CLI runs on an EVE-NG server)', default=False)
-@click.option('--disable-ztp', is_flag=True, help='Disable ZTP process in vEOS image (only available with --eve-ng)', default=False)
-@click.option('--import-docker', is_flag=True, help='Import docker image (only available with --image_type cEOSlab)', default=False)
+@click.option(
+ "--eve-ng",
+ is_flag=True,
+ help="Run EVE-NG vEOS provisioning (only if CLI runs on an EVE-NG server)",
+ default=False,
+)
+@click.option(
+ "--disable-ztp",
+ is_flag=True,
+ help="Disable ZTP process in vEOS image (only available with --eve-ng)",
+ default=False,
+)
+@click.option(
+ "--import-docker",
+ is_flag=True,
+ help="Import docker image (only available with --image_type cEOSlab)",
+ default=False,
+)
def eos(
- ctx: click.Context, image_type: str, output: str, log_level: str, eve_ng: bool, disable_ztp: bool,
- import_docker: bool, docker_name: str, version: Union[str, None] = None, release_type: str = RTYPE_FEATURE,
- latest: bool = False, branch: Union[str,None] = None
- ) -> int:
+ ctx: click.Context,
+ image_type: str,
+ output: str,
+ log_level: str,
+ eve_ng: bool,
+ disable_ztp: bool,
+ import_docker: bool,
+ docker_name: str,
+ version: Union[str, None] = None,
+ release_type: str = RTYPE_FEATURE,
+ latest: bool = False,
+ branch: Union[str, None] = None,
+) -> int:
"""Download EOS image from Arista website"""
console = Console()
# Get from Context
- token = ctx.obj['token']
- if token is None or token == '':
- console.print('ā— Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red")
+ token = ctx.obj["token"]
+ if token is None or token == "":
+ console.print(
+ "ā— Token is unset ! Please configure ARISTA_TOKEN or use --token option",
+ style="bold red",
+ )
sys.exit(1)
logger.remove()
if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
- console.print("šŸŖ [bold blue]eos-downloader[/bold blue] is starting...", )
- console.print(f' - Image Type: {image_type}')
- console.print(f' - Version: {version}')
-
+ console.print(
+ "šŸŖ [bold blue]eos-downloader[/bold blue] is starting...",
+ )
+ console.print(f" - Image Type: {image_type}")
+ console.print(f" - Version: {version}")
if version is not None:
my_download = eos_downloader.eos.EOSDownloader(
image=image_type,
- software='EOS',
+ software="EOS",
version=version,
token=token,
- hash_method='sha512sum')
+ hash_method="sha512sum",
+ )
my_download.authenticate()
elif latest:
my_download = eos_downloader.eos.EOSDownloader(
image=image_type,
- software='EOS',
- version='unset',
+ software="EOS",
+ version="unset",
token=token,
- hash_method='sha512sum')
+ hash_method="sha512sum",
+ )
my_download.authenticate()
if branch is None:
branch = str(my_download.latest_branch(rtype=release_type).branch)
latest_version = my_download.latest_eos(branch, rtype=release_type)
if str(latest_version) == BASE_VERSION_STR:
- console.print(f'[red]Error[/red], cannot find any version in {branch} for {release_type} release type')
+ console.print(
+ f"[red]Error[/red], cannot find any version in {branch} for {release_type} release type"
+ )
sys.exit(1)
my_download.version = str(latest_version)
@@ -92,46 +180,71 @@ def eos(
my_download.download_local(file_path=output, checksum=True)
if import_docker:
- my_download.docker_import(
- image_name=docker_name
- )
- console.print('āœ… processing done !')
+ my_download.docker_import(image_name=docker_name)
+ console.print("āœ… processing done !")
sys.exit(0)
-
@click.command(no_args_is_help=True)
@click.pass_context
-@click.option('--format', default='upgrade', help='CVP Image type', type=click.Choice(CVP_IMAGE_TYPE), required=True)
-@click.option('--version', default=None, help='CVP version', type=str, required=True)
-@click.option('--output', default=str(os.path.relpath(os.getcwd(), start=os.curdir)), help='Path to save image', type=click.Path(),show_default=True)
-@click.option('--log-level', '--log', help='Logging level of the command', default=None, type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False))
-def cvp(ctx: click.Context, version: str, format: str, output: str, log_level: str) -> int:
+@click.option(
+ "--format",
+ default="upgrade",
+ help="CVP Image type",
+ type=click.Choice(CVP_IMAGE_TYPE),
+ required=True,
+)
+@click.option("--version", default=None, help="CVP version", type=str, required=True)
+@click.option(
+ "--output",
+ default=str(os.path.relpath(os.getcwd(), start=os.curdir)),
+ help="Path to save image",
+ type=click.Path(),
+ show_default=True,
+)
+@click.option(
+ "--log-level",
+ "--log",
+ help="Logging level of the command",
+ default=None,
+ type=click.Choice(
+ ["debug", "info", "warning", "error", "critical"], case_sensitive=False
+ ),
+)
+def cvp(
+ ctx: click.Context, version: str, format: str, output: str, log_level: str
+) -> int:
"""Download CVP image from Arista website"""
console = Console()
# Get from Context
- token = ctx.obj['token']
- if token is None or token == '':
- console.print('ā— Token is unset ! Please configure ARISTA_TOKEN or use --token option', style="bold red")
+ token = ctx.obj["token"]
+ if token is None or token == "":
+ console.print(
+ "ā— Token is unset ! Please configure ARISTA_TOKEN or use --token option",
+ style="bold red",
+ )
sys.exit(1)
logger.remove()
if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
- console.print("šŸŖ [bold blue]eos-downloader[/bold blue] is starting...", )
- console.print(f' - Image Type: {format}')
- console.print(f' - Version: {version}')
+ console.print(
+ "šŸŖ [bold blue]eos-downloader[/bold blue] is starting...",
+ )
+ console.print(f" - Image Type: {format}")
+ console.print(f" - Version: {version}")
my_download = eos_downloader.eos.EOSDownloader(
image=format,
- software='CloudVision',
+ software="CloudVision",
version=version,
token=token,
- hash_method='md5sum')
+ hash_method="md5sum",
+ )
my_download.authenticate()
my_download.download_local(file_path=output, checksum=False)
- console.print('āœ… processing done !')
+ console.print("āœ… processing done !")
sys.exit(0)
diff --git a/eos_downloader/cli/info/commands.py b/eos_downloader/cli/info/commands.py
index b51003b..64097a1 100644
--- a/eos_downloader/cli/info/commands.py
+++ b/eos_downloader/cli/info/commands.py
@@ -24,12 +24,53 @@ from eos_downloader.models.version import BASE_VERSION_STR, RTYPE_FEATURE, RTYPE
@click.command(no_args_is_help=True)
@click.pass_context
-@click.option('--latest', '-l', is_flag=True, type=click.BOOL, default=False, help='Get latest version in given branch. If --branch is not use, get the latest branch with specific release type')
-@click.option('--release-type', '-rtype', type=click.Choice(RTYPES, case_sensitive=False), default=RTYPE_FEATURE, help='EOS release type to search')
-@click.option('--branch', '-b', type=click.STRING, default=None, help='EOS Branch to list releases')
-@click.option('--verbose', '-v', is_flag=True, type=click.BOOL, default=False, help='Human readable output. Default is none to use output in script)')
-@click.option('--log-level', '--log', help='Logging level of the command', default='warning', type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False))
-def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = None, release_type: str = RTYPE_FEATURE, latest: bool = False, verbose: bool = False) -> None:
+@click.option(
+ "--latest",
+ "-l",
+ is_flag=True,
+ type=click.BOOL,
+ default=False,
+ help="Get latest version in given branch. If --branch is not use, get the latest branch with specific release type",
+)
+@click.option(
+ "--release-type",
+ "-rtype",
+ type=click.Choice(RTYPES, case_sensitive=False),
+ default=RTYPE_FEATURE,
+ help="EOS release type to search",
+)
+@click.option(
+ "--branch",
+ "-b",
+ type=click.STRING,
+ default=None,
+ help="EOS Branch to list releases",
+)
+@click.option(
+ "--verbose",
+ "-v",
+ is_flag=True,
+ type=click.BOOL,
+ default=False,
+ help="Human readable output. Default is none to use output in script)",
+)
+@click.option(
+ "--log-level",
+ "--log",
+ help="Logging level of the command",
+ default="warning",
+ type=click.Choice(
+ ["debug", "info", "warning", "error", "critical"], case_sensitive=False
+ ),
+)
+def eos_versions(
+ ctx: click.Context,
+ log_level: str,
+ branch: Union[str, None] = None,
+ release_type: str = RTYPE_FEATURE,
+ latest: bool = False,
+ verbose: bool = False,
+) -> None:
# pylint: disable = too-many-branches
"""
List Available EOS version on Arista.com website.
@@ -42,22 +83,23 @@ def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = N
"""
console = Console()
# Get from Context
- token = ctx.obj['token']
+ token = ctx.obj["token"]
logger.remove()
if log_level is not None:
logger.add("eos-downloader.log", rotation="10 MB", level=log_level.upper())
my_download = eos_downloader.eos.EOSDownloader(
- image='unset',
- software='EOS',
- version='unset',
+ image="unset",
+ software="EOS",
+ version="unset",
token=token,
- hash_method='sha512sum')
+ hash_method="sha512sum",
+ )
auth = my_download.authenticate()
if verbose and auth:
- console.print('āœ… Authenticated on arista.com')
+ console.print("āœ… Authenticated on arista.com")
if release_type is not None:
release_type = release_type.upper()
@@ -67,21 +109,27 @@ def eos_versions(ctx: click.Context, log_level: str, branch: Union[str,None] = N
branch = str(my_download.latest_branch(rtype=release_type).branch)
latest_version = my_download.latest_eos(branch, rtype=release_type)
if str(latest_version) == BASE_VERSION_STR:
- console.print(f'[red]Error[/red], cannot find any version in {branch} for {release_type} release type')
+ console.print(
+ f"[red]Error[/red], cannot find any version in {branch} for {release_type} release type"
+ )
sys.exit(1)
if verbose:
- console.print(f'Branch {branch} has been selected with release type {release_type}')
+ console.print(
+ f"Branch {branch} has been selected with release type {release_type}"
+ )
if branch is not None:
- console.print(f'Latest release for {branch}: {latest_version}')
+ console.print(f"Latest release for {branch}: {latest_version}")
else:
- console.print(f'Latest EOS release: {latest_version}')
+ console.print(f"Latest EOS release: {latest_version}")
else:
- console.print(f'{ latest_version }')
+ console.print(f"{ latest_version }")
else:
versions = my_download.get_eos_versions(branch=branch, rtype=release_type)
if verbose:
- console.print(f'List of available versions for {branch if branch is not None else "all branches"}')
+ console.print(
+ f'List of available versions for {branch if branch is not None else "all branches"}'
+ )
for version in versions:
- console.print(f' ā†’ {str(version)}')
+ console.print(f" ā†’ {str(version)}")
else:
pprint([str(version) for version in versions])
diff --git a/eos_downloader/cli/utils.py b/eos_downloader/cli/utils.py
new file mode 100644
index 0000000..4a14f53
--- /dev/null
+++ b/eos_downloader/cli/utils.py
@@ -0,0 +1,38 @@
+#!/usr/bin/python
+# coding: utf-8 -*-
+# pylint: disable=inconsistent-return-statements
+
+
+"""
+Extension for the python ``click`` module
+to provide a group or command with aliases.
+"""
+
+
+from typing import Any
+import click
+
+
+class AliasedGroup(click.Group):
+ """
+ Implements a subclass of Group that accepts a prefix for a command.
+ If there were a command called push, it would accept pus as an alias (so long as it was unique)
+ """
+ def get_command(self, ctx: click.Context, cmd_name: str) -> Any:
+ """Documentation to build"""
+ rv = click.Group.get_command(self, ctx, cmd_name)
+ if rv is not None:
+ return rv
+ matches = [x for x in self.list_commands(ctx)
+ if x.startswith(cmd_name)]
+ if not matches:
+ return None
+ if len(matches) == 1:
+ return click.Group.get_command(self, ctx, matches[0])
+ ctx.fail(f"Too many matches: {', '.join(sorted(matches))}")
+
+ def resolve_command(self, ctx: click.Context, args: Any) -> Any:
+ """Documentation to build"""
+ # always return the full command name
+ _, cmd, args = super().resolve_command(ctx, args)
+ return cmd.name, cmd, args
diff --git a/eos_downloader/cvp.py b/eos_downloader/cvp.py
index 6f14eb0..678daa8 100644
--- a/eos_downloader/cvp.py
+++ b/eos_downloader/cvp.py
@@ -6,11 +6,12 @@ CVP Uploader content
"""
import os
-from typing import List, Optional, Any
from dataclasses import dataclass
-from loguru import logger
+from typing import Any, List, Optional
+
from cvprac.cvp_client import CvpClient
from cvprac.cvp_client_errors import CvpLoginError
+from loguru import logger
# from eos_downloader.tools import exc_to_str
@@ -20,8 +21,9 @@ from cvprac.cvp_client_errors import CvpLoginError
@dataclass
class CvpAuthenticationItem:
"""
- Data structure to represent Cloudvision Authentication
+ Data structure to represent Cloudvision Authentication
"""
+
server: str
port: int = 443
token: Optional[str] = None
@@ -29,15 +31,16 @@ class CvpAuthenticationItem:
validate_cert: bool = False
-class Filer():
+class Filer:
# pylint: disable=too-few-public-methods
"""
Filer Helper for file management
"""
+
def __init__(self, path: str) -> None:
self.file_exist = False
- self.filename = ''
- self.absolute_path = ''
+ self.filename = ""
+ self.absolute_path = ""
self.relative_path = path
if os.path.exists(path):
self.file_exist = True
@@ -45,13 +48,14 @@ class Filer():
self.absolute_path = os.path.realpath(path)
def __repr__(self) -> str:
- return self.absolute_path if self.file_exist else ''
+ return self.absolute_path if self.file_exist else ""
-class CvFeatureManager():
+class CvFeatureManager:
"""
CvFeatureManager Object to interect with Cloudvision
"""
+
def __init__(self, authentication: CvpAuthenticationItem) -> None:
"""
__init__ Class Creator
@@ -86,19 +90,21 @@ class CvFeatureManager():
try:
client.connect(
nodes=[authentication.server],
- username='',
- password='',
+ username="",
+ password="",
api_token=authentication.token,
is_cvaas=True,
port=authentication.port,
cert=authentication.validate_cert,
- request_timeout=authentication.timeout
+ request_timeout=authentication.timeout,
)
except CvpLoginError as error_data:
- logger.error(f'Cannot connect to Cloudvision server {authentication.server}')
- logger.debug(f'Error message: {error_data}')
- logger.info('connected to Cloudvision server')
- logger.debug(f'Connection info: {authentication}')
+ logger.error(
+ f"Cannot connect to Cloudvision server {authentication.server}"
+ )
+ logger.debug(f"Error message: {error_data}")
+ logger.info("connected to Cloudvision server")
+ logger.debug(f"Connection info: {authentication}")
return client
def __get_images(self) -> List[Any]:
@@ -111,8 +117,8 @@ class CvFeatureManager():
Fact returned by Cloudvision
"""
images = []
- logger.debug(' -> Collecting images')
- images = self._cv_instance.api.get_images()['data']
+ logger.debug(" -> Collecting images")
+ images = self._cv_instance.api.get_images()["data"]
return images if self.__check_api_result(images) else []
# def __get_bundles(self):
@@ -161,7 +167,11 @@ class CvFeatureManager():
bool
True if present
"""
- return any(image_name == image['name'] for image in self._cv_images) if isinstance(self._cv_images, list) else False
+ return (
+ any(image_name == image["name"] for image in self._cv_images)
+ if isinstance(self._cv_images, list)
+ else False
+ )
def _does_bundle_exist(self, bundle_name: str) -> bool:
# pylint: disable=unused-argument
@@ -192,19 +202,23 @@ class CvFeatureManager():
"""
image_item = Filer(path=image_path)
if image_item.file_exist is False:
- logger.error(f'File not found: {image_item.relative_path}')
+ logger.error(f"File not found: {image_item.relative_path}")
return False
- logger.info(f'File path for image: {image_item}')
+ logger.info(f"File path for image: {image_item}")
if self._does_image_exist(image_name=image_item.filename):
- logger.error("Image found in Cloudvision , Please delete it before running this script")
+ logger.error(
+ "Image found in Cloudvision , Please delete it before running this script"
+ )
return False
try:
- upload_result = self._cv_instance.api.add_image(filepath=image_item.absolute_path)
+ upload_result = self._cv_instance.api.add_image(
+ filepath=image_item.absolute_path
+ )
except Exception as e: # pylint: disable=broad-exception-caught
- logger.error('An error occurred during upload, check CV connection')
- logger.error(f'Exception message is: {e}')
+ logger.error("An error occurred during upload, check CV connection")
+ logger.error(f"Exception message is: {e}")
return False
- logger.debug(f'Upload Result is : {upload_result}')
+ logger.debug(f"Upload Result is : {upload_result}")
return True
def build_image_list(self, image_list: List[str]) -> List[Any]:
@@ -252,25 +266,30 @@ class CvFeatureManager():
bool
True if succeeds
"""
- logger.debug(f'Init creation of an image bundle {name} with following images {images_name}')
+ logger.debug(
+ f"Init creation of an image bundle {name} with following images {images_name}"
+ )
all_images_present: List[bool] = []
self._cv_images = self.__get_images()
all_images_present.extend(
- self._does_image_exist(image_name=image_name)
- for image_name in images_name
+ self._does_image_exist(image_name=image_name) for image_name in images_name
)
# Bundle Create
if self._does_bundle_exist(bundle_name=name) is False:
- logger.debug(f'Creating image bundle {name} with following images {images_name}')
+ logger.debug(
+ f"Creating image bundle {name} with following images {images_name}"
+ )
images_data = self.build_image_list(image_list=images_name)
if images_data is not None:
- logger.debug('Images information: {images_data}')
+ logger.debug("Images information: {images_data}")
try:
- data = self._cv_instance.api.save_image_bundle(name=name, images=images_data)
+ data = self._cv_instance.api.save_image_bundle(
+ name=name, images=images_data
+ )
except Exception as e: # pylint: disable=broad-exception-caught
- logger.critical(f'{e}')
+ logger.critical(f"{e}")
else:
logger.debug(data)
return True
- logger.critical('No data found for images')
+ logger.critical("No data found for images")
return False
diff --git a/eos_downloader/data.py b/eos_downloader/data.py
index 74f2f8e..ba54b3b 100644
--- a/eos_downloader/data.py
+++ b/eos_downloader/data.py
@@ -12,82 +12,22 @@ Data are built from content of Arista XML file
# [platform][image][version]
DATA_MAPPING = {
"CloudVision": {
- "ova": {
- "extension": ".ova",
- "prepend": "cvp",
- "folder_level": 0
- },
- "rpm": {
- "extension": "",
- "prepend": "cvp-rpm-installer",
- "folder_level": 0
- },
- "kvm": {
- "extension": "-kvm.tgz",
- "prepend": "cvp",
- "folder_level": 0
- },
- "upgrade": {
- "extension": ".tgz",
- "prepend": "cvp-upgrade",
- "folder_level": 0
- },
+ "ova": {"extension": ".ova", "prepend": "cvp", "folder_level": 0},
+ "rpm": {"extension": "", "prepend": "cvp-rpm-installer", "folder_level": 0},
+ "kvm": {"extension": "-kvm.tgz", "prepend": "cvp", "folder_level": 0},
+ "upgrade": {"extension": ".tgz", "prepend": "cvp-upgrade", "folder_level": 0},
},
"EOS": {
- "64": {
- "extension": ".swi",
- "prepend": "EOS64",
- "folder_level": 0
- },
- "INT": {
- "extension": "-INT.swi",
- "prepend": "EOS",
- "folder_level": 1
- },
- "2GB-INT": {
- "extension": "-INT.swi",
- "prepend": "EOS-2GB",
- "folder_level": 1
- },
- "cEOS": {
- "extension": ".tar.xz",
- "prepend": "cEOS-lab",
- "folder_level": 0
- },
- "cEOS64": {
- "extension": ".tar.xz",
- "prepend": "cEOS64-lab",
- "folder_level": 0
- },
- "vEOS": {
- "extension": ".vmdk",
- "prepend": "vEOS",
- "folder_level": 0
- },
- "vEOS-lab": {
- "extension": ".vmdk",
- "prepend": "vEOS-lab",
- "folder_level": 0
- },
- "EOS-2GB": {
- "extension": ".swi",
- "prepend": "EOS-2GB",
- "folder_level": 0
- },
- "RN": {
- "extension": "-",
- "prepend": "RN",
- "folder_level": 0
- },
- "SOURCE": {
- "extension": "-source.tar",
- "prepend": "EOS",
- "folder_level": 0
- },
- "default": {
- "extension": ".swi",
- "prepend": "EOS",
- "folder_level": 0
- }
- }
+ "64": {"extension": ".swi", "prepend": "EOS64", "folder_level": 0},
+ "INT": {"extension": "-INT.swi", "prepend": "EOS", "folder_level": 1},
+ "2GB-INT": {"extension": "-INT.swi", "prepend": "EOS-2GB", "folder_level": 1},
+ "cEOS": {"extension": ".tar.xz", "prepend": "cEOS-lab", "folder_level": 0},
+ "cEOS64": {"extension": ".tar.xz", "prepend": "cEOS64-lab", "folder_level": 0},
+ "vEOS": {"extension": ".vmdk", "prepend": "vEOS", "folder_level": 0},
+ "vEOS-lab": {"extension": ".vmdk", "prepend": "vEOS-lab", "folder_level": 0},
+ "EOS-2GB": {"extension": ".swi", "prepend": "EOS-2GB", "folder_level": 0},
+ "RN": {"extension": "-", "prepend": "RN", "folder_level": 0},
+ "SOURCE": {"extension": "-source.tar", "prepend": "EOS", "folder_level": 0},
+ "default": {"extension": ".swi", "prepend": "EOS", "folder_level": 0},
+ },
}
diff --git a/eos_downloader/download.py b/eos_downloader/download.py
index 2297b04..2c9576e 100644
--- a/eos_downloader/download.py
+++ b/eos_downloader/download.py
@@ -8,13 +8,20 @@ import os.path
import signal
from concurrent.futures import ThreadPoolExecutor
from threading import Event
-from typing import Iterable, Any
+from typing import Any, Iterable
import requests
import rich
from rich import console
-from rich.progress import (BarColumn, DownloadColumn, Progress, TaskID,
- TextColumn, TimeElapsedColumn, TransferSpeedColumn)
+from rich.progress import (
+ BarColumn,
+ DownloadColumn,
+ Progress,
+ TaskID,
+ TextColumn,
+ TimeElapsedColumn,
+ TransferSpeedColumn,
+)
console = rich.get_console()
done_event = Event()
@@ -28,7 +35,7 @@ def handle_sigint(signum: Any, frame: Any) -> None:
signal.signal(signal.SIGINT, handle_sigint)
-class DownloadProgressBar():
+class DownloadProgressBar:
"""
Object to manage Download process with Progress Bar from Rich
"""
@@ -38,7 +45,9 @@ class DownloadProgressBar():
Class Constructor
"""
self.progress = Progress(
- TextColumn("šŸ’¾ Downloading [bold blue]{task.fields[filename]}", justify="right"),
+ TextColumn(
+ "šŸ’¾ Downloading [bold blue]{task.fields[filename]}", justify="right"
+ ),
BarColumn(bar_width=None),
"[progress.percentage]{task.percentage:>3.1f}%",
"ā€¢",
@@ -48,14 +57,16 @@ class DownloadProgressBar():
"ā€¢",
TimeElapsedColumn(),
"ā€¢",
- console=console
+ console=console,
)
- def _copy_url(self, task_id: TaskID, url: str, path: str, block_size: int = 1024) -> bool:
+ def _copy_url(
+ self, task_id: TaskID, url: str, path: str, block_size: int = 1024
+ ) -> bool:
"""Copy data from a url to a local file."""
response = requests.get(url, stream=True, timeout=5)
# This will break if the response doesn't contain content length
- self.progress.update(task_id, total=int(response.headers['Content-Length']))
+ self.progress.update(task_id, total=int(response.headers["Content-Length"]))
with open(path, "wb") as dest_file:
self.progress.start_task(task_id)
for data in response.iter_content(chunk_size=block_size):
@@ -71,7 +82,9 @@ class DownloadProgressBar():
with self.progress:
with ThreadPoolExecutor(max_workers=4) as pool:
for url in urls:
- filename = url.split("/")[-1].split('?')[0]
+ filename = url.split("/")[-1].split("?")[0]
dest_path = os.path.join(dest_dir, filename)
- task_id = self.progress.add_task("download", filename=filename, start=False)
+ task_id = self.progress.add_task(
+ "download", filename=filename, start=False
+ )
pool.submit(self._copy_url, task_id, url, dest_path)
diff --git a/eos_downloader/eos.py b/eos_downloader/eos.py
index e5f3670..716992f 100644
--- a/eos_downloader/eos.py
+++ b/eos_downloader/eos.py
@@ -14,13 +14,20 @@ import rich
from loguru import logger
from rich import console
-from eos_downloader.models.version import BASE_BRANCH_STR, BASE_VERSION_STR, REGEX_EOS_VERSION, RTYPE_FEATURE, EosVersion
+from eos_downloader.models.version import (
+ BASE_BRANCH_STR,
+ BASE_VERSION_STR,
+ REGEX_EOS_VERSION,
+ RTYPE_FEATURE,
+ EosVersion,
+)
from eos_downloader.object_downloader import ObjectDownloader
# logger = logging.getLogger(__name__)
console = rich.get_console()
+
class EOSDownloader(ObjectDownloader):
"""
EOSDownloader Object to download EOS images from Arista.com website
@@ -47,22 +54,27 @@ class EOSDownloader(ObjectDownloader):
file_path : str
Path where EOS image is located
"""
- logger.info('Mounting volume to disable ZTP')
- console.print('šŸš€ Mounting volume to disable ZTP')
+ logger.info("Mounting volume to disable ZTP")
+ console.print("šŸš€ Mounting volume to disable ZTP")
raw_folder = os.path.join(file_path, "raw")
os.system(f"rm -rf {raw_folder}")
os.system(f"mkdir -p {raw_folder}")
os.system(
- f'guestmount -a {os.path.join(file_path, "hda.qcow2")} -m /dev/sda2 {os.path.join(file_path, "raw")}')
- ztp_file = os.path.join(file_path, 'raw/zerotouch-config')
- with open(ztp_file, 'w', encoding='ascii') as zfile:
- zfile.write('DISABLE=True')
- logger.info(f'Unmounting volume in {file_path}')
+ f'guestmount -a {os.path.join(file_path, "hda.qcow2")} -m /dev/sda2 {os.path.join(file_path, "raw")}'
+ )
+ ztp_file = os.path.join(file_path, "raw/zerotouch-config")
+ with open(ztp_file, "w", encoding="ascii") as zfile:
+ zfile.write("DISABLE=True")
+ logger.info(f"Unmounting volume in {file_path}")
os.system(f"guestunmount {os.path.join(file_path, 'raw')}")
os.system(f"rm -rf {os.path.join(file_path, 'raw')}")
logger.info(f"Volume has been successfully unmounted at {file_path}")
- def _parse_xml_for_version(self,root_xml: ET.ElementTree, xpath: str = './/dir[@label="Active Releases"]/dir/dir/[@label]') -> List[EosVersion]:
+ def _parse_xml_for_version(
+ self,
+ root_xml: ET.ElementTree,
+ xpath: str = './/dir[@label="Active Releases"]/dir/dir/[@label]',
+ ) -> List[EosVersion]:
"""
Extract list of available EOS versions from Arista.com website
@@ -77,19 +89,21 @@ class EOSDownloader(ObjectDownloader):
"""
# XPATH: .//dir[@label="Active Releases"]/dir/dir/[@label]
if self.eos_versions is None:
- logger.debug(f'Using xpath {xpath}')
+ logger.debug(f"Using xpath {xpath}")
eos_versions = []
for node in root_xml.findall(xpath):
- if 'label' in node.attrib and node.get('label') is not None:
- label = node.get('label')
+ if "label" in node.attrib and node.get("label") is not None:
+ label = node.get("label")
if label is not None and REGEX_EOS_VERSION.match(label):
eos_version = EosVersion.from_str(label)
eos_versions.append(eos_version)
logger.debug(f"Found {label} - {eos_version}")
- logger.debug(f'List of versions found on arista.com is: {eos_versions}')
+ logger.debug(f"List of versions found on arista.com is: {eos_versions}")
self.eos_versions = eos_versions
else:
- logger.debug('receiving instruction to download versions, but already available')
+ logger.debug(
+ "receiving instruction to download versions, but already available"
+ )
return self.eos_versions
def _get_branches(self, with_rtype: str = RTYPE_FEATURE) -> List[str]:
@@ -104,9 +118,11 @@ class EOSDownloader(ObjectDownloader):
Returns:
List[str]: A lsit of string that represent all availables EOS branches
"""
- root = self._get_folder_tree()
+ root = self.get_folder_tree()
versions = self._parse_xml_for_version(root_xml=root)
- return list({version.branch for version in versions if version.rtype == with_rtype})
+ return list(
+ {version.branch for version in versions if version.rtype == with_rtype}
+ )
def latest_branch(self, rtype: str = RTYPE_FEATURE) -> EosVersion:
"""
@@ -125,7 +141,9 @@ class EOSDownloader(ObjectDownloader):
selected_branch = branch
return selected_branch
- def get_eos_versions(self, branch: Union[str,None] = None, rtype: Union[str,None] = None) -> List[EosVersion]:
+ def get_eos_versions(
+ self, branch: Union[str, None] = None, rtype: Union[str, None] = None
+ ) -> List[EosVersion]:
"""
Get a list of available EOS version available on arista.com
@@ -139,16 +157,22 @@ class EOSDownloader(ObjectDownloader):
Returns:
List[EosVersion]: A list of versions available
"""
- root = self._get_folder_tree()
+ root = self.get_folder_tree()
result = []
for version in self._parse_xml_for_version(root_xml=root):
if branch is None and (version.rtype == rtype or rtype is None):
result.append(version)
- elif branch is not None and version.is_in_branch(branch) and version.rtype == rtype:
+ elif (
+ branch is not None
+ and version.is_in_branch(branch)
+ and version.rtype == rtype
+ ):
result.append(version)
return result
- def latest_eos(self, branch: Union[str,None] = None, rtype: str = RTYPE_FEATURE) -> EosVersion:
+ def latest_eos(
+ self, branch: Union[str, None] = None, rtype: str = RTYPE_FEATURE
+ ) -> EosVersion:
"""
Get latest version of EOS
@@ -168,7 +192,9 @@ class EOSDownloader(ObjectDownloader):
latest_branch = self.latest_branch(rtype=rtype)
else:
latest_branch = EosVersion.from_str(branch)
- for version in self.get_eos_versions(branch=str(latest_branch.branch), rtype=rtype):
+ for version in self.get_eos_versions(
+ branch=str(latest_branch.branch), rtype=rtype
+ ):
if version > selected_version:
if rtype is not None and version.rtype == rtype:
selected_version = version
diff --git a/eos_downloader/models/version.py b/eos_downloader/models/version.py
index 4b051a5..22de100 100644
--- a/eos_downloader/models/version.py
+++ b/eos_downloader/models/version.py
@@ -16,11 +16,11 @@ from eos_downloader.tools import exc_to_str
# logger = logging.getLogger(__name__)
-BASE_VERSION_STR = '4.0.0F'
-BASE_BRANCH_STR = '4.0'
+BASE_VERSION_STR = "4.0.0F"
+BASE_BRANCH_STR = "4.0"
-RTYPE_FEATURE = 'F'
-RTYPE_MAINTENANCE = 'M'
+RTYPE_FEATURE = "F"
+RTYPE_MAINTENANCE = "M"
RTYPES = [RTYPE_FEATURE, RTYPE_MAINTENANCE]
# Regular Expression to capture multiple EOS version format
@@ -29,8 +29,12 @@ RTYPES = [RTYPE_FEATURE, RTYPE_MAINTENANCE]
# 4.21.1M
# 4.28.10.F
# 4.28.6.1M
-REGEX_EOS_VERSION = re.compile(r"^.*(?P<major>4)\.(?P<minor>\d{1,2})\.(?P<patch>\d{1,2})(?P<other>\.\d*)*(?P<rtype>[M,F])*$")
-REGEX_EOS_BRANCH = re.compile(r"^.*(?P<major>4)\.(?P<minor>\d{1,2})(\.?P<patch>\d)*(\.\d)*(?P<rtype>[M,F])*$")
+REGEX_EOS_VERSION = re.compile(
+ r"^.*(?P<major>4)\.(?P<minor>\d{1,2})\.(?P<patch>\d{1,2})(?P<other>\.\d*)*(?P<rtype>[M,F])*$"
+)
+REGEX_EOS_BRANCH = re.compile(
+ r"^.*(?P<major>4)\.(?P<minor>\d{1,2})(\.?P<patch>\d)*(\.\d)*(?P<rtype>[M,F])*$"
+)
class EosVersion(BaseModel):
@@ -59,10 +63,11 @@ class EosVersion(BaseModel):
Args:
BaseModel (Pydantic): Pydantic Base Model
"""
+
major: int = 4
minor: int = 0
patch: int = 0
- rtype: Optional[str] = 'F'
+ rtype: Optional[str] = "F"
other: Any = None
@classmethod
@@ -84,7 +89,7 @@ class EosVersion(BaseModel):
Returns:
EosVersion object
"""
- logger.debug(f'receiving version: {eos_version}')
+ logger.debug(f"receiving version: {eos_version}")
if REGEX_EOS_VERSION.match(eos_version):
matches = REGEX_EOS_VERSION.match(eos_version)
# assert matches is not None
@@ -95,7 +100,7 @@ class EosVersion(BaseModel):
# assert matches is not None
assert matches is not None
return cls(**matches.groupdict())
- logger.error(f'Error occured with {eos_version}')
+ logger.error(f"Error occured with {eos_version}")
return EosVersion()
@property
@@ -106,7 +111,7 @@ class EosVersion(BaseModel):
Returns:
str: branch from version
"""
- return f'{self.major}.{self.minor}'
+ return f"{self.major}.{self.minor}"
def __str__(self) -> str:
"""
@@ -118,8 +123,8 @@ class EosVersion(BaseModel):
str: A standard EOS version string representing <MAJOR>.<MINOR>.<PATCH><RTYPE>
"""
if self.other is None:
- return f'{self.major}.{self.minor}.{self.patch}{self.rtype}'
- return f'{self.major}.{self.minor}.{self.patch}{self.other}{self.rtype}'
+ return f"{self.major}.{self.minor}.{self.patch}{self.rtype}"
+ return f"{self.major}.{self.minor}.{self.patch}{self.other}{self.rtype}"
def _compare(self, other: EosVersion) -> float:
"""
@@ -141,58 +146,68 @@ class EosVersion(BaseModel):
float: -1 if ver1 < ver2, 0 if ver1 == ver2, 1 if ver1 > ver2
"""
if not isinstance(other, EosVersion):
- raise ValueError(f'could not compare {other} as it is not an EosVersion object')
+ raise ValueError(
+ f"could not compare {other} as it is not an EosVersion object"
+ )
comparison_flag: float = 0
- logger.warning(f'current version {self.__str__()} - other {str(other)}') # pylint: disable = unnecessary-dunder-call
+ logger.warning(
+ f"current version {self.__str__()} - other {str(other)}" # pylint: disable = unnecessary-dunder-call
+ )
for key, _ in self.dict().items():
- if comparison_flag == 0 and self.dict()[key] is None or other.dict()[key] is None:
- logger.debug(f'{key}: local None - remote None')
- logger.debug(f'{key}: local {self.dict()} - remote {other.dict()}')
+ if (
+ comparison_flag == 0
+ and self.dict()[key] is None
+ or other.dict()[key] is None
+ ):
+ logger.debug(f"{key}: local None - remote None")
+ logger.debug(f"{key}: local {self.dict()} - remote {other.dict()}")
return comparison_flag
- logger.debug(f'{key}: local {self.dict()[key]} - remote {other.dict()[key]}')
+ logger.debug(
+ f"{key}: local {self.dict()[key]} - remote {other.dict()[key]}"
+ )
if comparison_flag == 0 and self.dict()[key] < other.dict()[key]:
comparison_flag = -1
if comparison_flag == 0 and self.dict()[key] > other.dict()[key]:
comparison_flag = 1
if comparison_flag != 0:
- logger.info(f'comparison result is {comparison_flag}')
+ logger.info(f"comparison result is {comparison_flag}")
return comparison_flag
- logger.info(f'comparison result is {comparison_flag}')
+ logger.info(f"comparison result is {comparison_flag}")
return comparison_flag
@typing.no_type_check
def __eq__(self, other):
- """ Implement __eq__ function (==) """
+ """Implement __eq__ function (==)"""
return self._compare(other) == 0
@typing.no_type_check
def __ne__(self, other):
# type: ignore
- """ Implement __nw__ function (!=) """
+ """Implement __nw__ function (!=)"""
return self._compare(other) != 0
@typing.no_type_check
def __lt__(self, other):
# type: ignore
- """ Implement __lt__ function (<) """
+ """Implement __lt__ function (<)"""
return self._compare(other) < 0
@typing.no_type_check
def __le__(self, other):
# type: ignore
- """ Implement __le__ function (<=) """
+ """Implement __le__ function (<=)"""
return self._compare(other) <= 0
@typing.no_type_check
def __gt__(self, other):
# type: ignore
- """ Implement __gt__ function (>) """
+ """Implement __gt__ function (>)"""
return self._compare(other) > 0
@typing.no_type_check
def __ge__(self, other):
# type: ignore
- """ Implement __ge__ function (>=) """
+ """Implement __ge__ function (>=)"""
return self._compare(other) >= 0
def match(self, match_expr: str) -> bool:
@@ -236,7 +251,7 @@ class EosVersion(BaseModel):
"['<', '>', '==', '<=', '>=', '!=']. "
f"You provided: {match_expr}"
)
- logger.debug(f'work on comparison {prefix} with base release {match_version}')
+ logger.debug(f"work on comparison {prefix} with base release {match_version}")
possibilities_dict = {
">": (1,),
"<": (-1,),
@@ -263,7 +278,7 @@ class EosVersion(BaseModel):
bool: True if current version is in provided branch, otherwise False
"""
try:
- logger.debug(f'reading branch str:{branch_str}')
+ logger.debug(f"reading branch str:{branch_str}")
branch = EosVersion.from_str(branch_str)
except Exception as error: # pylint: disable = broad-exception-caught
logger.error(exc_to_str(error))
diff --git a/eos_downloader/object_downloader.py b/eos_downloader/object_downloader.py
index 0420acb..d7b1418 100644
--- a/eos_downloader/object_downloader.py
+++ b/eos_downloader/object_downloader.py
@@ -8,8 +8,13 @@
eos_downloader class definition
"""
-from __future__ import (absolute_import, division, print_function,
- unicode_literals, annotations)
+from __future__ import (
+ absolute_import,
+ annotations,
+ division,
+ print_function,
+ unicode_literals,
+)
import base64
import glob
@@ -26,9 +31,14 @@ from loguru import logger
from rich import console
from tqdm import tqdm
-from eos_downloader import (ARISTA_DOWNLOAD_URL, ARISTA_GET_SESSION,
- ARISTA_SOFTWARE_FOLDER_TREE, EVE_QEMU_FOLDER_PATH,
- MSG_INVALID_DATA, MSG_TOKEN_EXPIRED)
+from eos_downloader import (
+ ARISTA_DOWNLOAD_URL,
+ ARISTA_GET_SESSION,
+ ARISTA_SOFTWARE_FOLDER_TREE,
+ EVE_QEMU_FOLDER_PATH,
+ MSG_INVALID_DATA,
+ MSG_TOKEN_EXPIRED,
+)
from eos_downloader.data import DATA_MAPPING
from eos_downloader.download import DownloadProgressBar
@@ -37,11 +47,19 @@ from eos_downloader.download import DownloadProgressBar
console = rich.get_console()
-class ObjectDownloader():
+class ObjectDownloader:
"""
ObjectDownloader Generic Object to download from Arista.com
"""
- def __init__(self, image: str, version: str, token: str, software: str = 'EOS', hash_method: str = 'md5sum'):
+
+ def __init__(
+ self,
+ image: str,
+ version: str,
+ token: str,
+ software: str = "EOS",
+ hash_method: str = "md5sum",
+ ):
"""
__init__ Class constructor
@@ -70,10 +88,10 @@ class ObjectDownloader():
self.hash_method = hash_method
self.timeout = 5
# Logging
- logger.debug(f'Filename built by _build_filename is {self.filename}')
+ logger.debug(f"Filename built by _build_filename is {self.filename}")
def __str__(self) -> str:
- return f'{self.software} - {self.image} - {self.version}'
+ return f"{self.software} - {self.image} - {self.version}"
# def __repr__(self):
# return str(self.__dict__)
@@ -102,16 +120,18 @@ class ObjectDownloader():
str:
Filename to search for on Arista.com
"""
- logger.info('start build')
+ logger.info("start build")
if self.software in DATA_MAPPING:
- logger.info(f'software in data mapping: {self.software}')
+ logger.info(f"software in data mapping: {self.software}")
if self.image in DATA_MAPPING[self.software]:
- logger.info(f'image in data mapping: {self.image}')
+ logger.info(f"image in data mapping: {self.image}")
return f"{DATA_MAPPING[self.software][self.image]['prepend']}-{self.version}{DATA_MAPPING[self.software][self.image]['extension']}"
return f"{DATA_MAPPING[self.software]['default']['prepend']}-{self.version}{DATA_MAPPING[self.software]['default']['extension']}"
- raise ValueError(f'Incorrect value for software {self.software}')
+ raise ValueError(f"Incorrect value for software {self.software}")
- def _parse_xml_for_path(self, root_xml: ET.ElementTree, xpath: str, search_file: str) -> str:
+ def _parse_xml_for_path(
+ self, root_xml: ET.ElementTree, xpath: str, search_file: str
+ ) -> str:
# sourcery skip: remove-unnecessary-cast
"""
_parse_xml Read and extract data from XML using XPATH
@@ -132,18 +152,18 @@ class ObjectDownloader():
str
File Path on Arista server side
"""
- logger.debug(f'Using xpath {xpath}')
- logger.debug(f'Search for file {search_file}')
- console.print(f'šŸ”Ž Searching file {search_file}')
+ logger.debug(f"Using xpath {xpath}")
+ logger.debug(f"Search for file {search_file}")
+ console.print(f"šŸ”Ž Searching file {search_file}")
for node in root_xml.findall(xpath):
# logger.debug('Found {}', node.text)
if str(node.text).lower() == search_file.lower():
- path = node.get('path')
- console.print(f' -> Found file at {path}')
+ path = node.get("path")
+ console.print(f" -> Found file at {path}")
logger.info(f'Found {node.text} at {node.get("path")}')
- return str(node.get('path')) if node.get('path') is not None else ''
- logger.error(f'Requested file ({self.filename}) not found !')
- return ''
+ return str(node.get("path")) if node.get("path") is not None else ""
+ logger.error(f"Requested file ({self.filename}) not found !")
+ return ""
def _get_hash(self, file_path: str) -> str:
"""
@@ -165,10 +185,10 @@ class ObjectDownloader():
dl_rich_progress_bar = DownloadProgressBar()
dl_rich_progress_bar.download(urls=[hash_url], dest_dir=file_path)
hash_downloaded = f"{file_path}/{os.path.basename(remote_hash_file)}"
- hash_content = 'unset'
- with open(hash_downloaded, 'r', encoding='utf-8') as f:
+ hash_content = "unset"
+ with open(hash_downloaded, "r", encoding="utf-8") as f:
hash_content = f.read()
- return hash_content.split(' ')[0]
+ return hash_content.split(" ")[0]
@staticmethod
def _compute_hash_md5sum(file: str, hash_expected: str) -> bool:
@@ -195,7 +215,9 @@ class ObjectDownloader():
hash_md5.update(chunk)
if hash_md5.hexdigest() == hash_expected:
return True
- logger.warning(f'Downloaded file is corrupt: local md5 ({hash_md5.hexdigest()}) is different to md5 from arista ({hash_expected})')
+ logger.warning(
+ f"Downloaded file is corrupt: local md5 ({hash_md5.hexdigest()}) is different to md5 from arista ({hash_expected})"
+ )
return False
@staticmethod
@@ -223,10 +245,12 @@ class ObjectDownloader():
hash_sha512.update(chunk)
if hash_sha512.hexdigest() == hash_expected:
return True
- logger.warning(f'Downloaded file is corrupt: local sha512 ({hash_sha512.hexdigest()}) is different to sha512 from arista ({hash_expected})')
+ logger.warning(
+ f"Downloaded file is corrupt: local sha512 ({hash_sha512.hexdigest()}) is different to sha512 from arista ({hash_expected})"
+ )
return False
- def _get_folder_tree(self) -> ET.ElementTree:
+ def get_folder_tree(self) -> ET.ElementTree:
"""
_get_folder_tree Download XML tree from Arista server
@@ -237,15 +261,17 @@ class ObjectDownloader():
"""
if self.session_id is None:
self.authenticate()
- jsonpost = {'sessionCode': self.session_id}
- result = requests.post(ARISTA_SOFTWARE_FOLDER_TREE, data=json.dumps(jsonpost), timeout=self.timeout)
+ jsonpost = {"sessionCode": self.session_id}
+ result = requests.post(
+ ARISTA_SOFTWARE_FOLDER_TREE, data=json.dumps(jsonpost), timeout=self.timeout
+ )
try:
folder_tree = result.json()["data"]["xml"]
return ET.ElementTree(ET.fromstring(folder_tree))
except KeyError as error:
logger.error(MSG_INVALID_DATA)
- logger.error(f'Server returned: {error}')
- console.print(f'āŒ {MSG_INVALID_DATA}', style="bold red")
+ logger.error(f"Server returned: {error}")
+ console.print(f"āŒ {MSG_INVALID_DATA}", style="bold red")
sys.exit(1)
def _get_remote_filepath(self) -> str:
@@ -259,12 +285,14 @@ class ObjectDownloader():
str
Remote path of the file to download
"""
- root = self._get_folder_tree()
+ root = self.get_folder_tree()
logger.debug("GET XML content from ARISTA.com")
xpath = f'.//dir[@label="{self.software}"]//file'
- return self._parse_xml_for_path(root_xml=root, xpath=xpath, search_file=self.filename)
+ return self._parse_xml_for_path(
+ root_xml=root, xpath=xpath, search_file=self.filename
+ )
- def _get_remote_hashpath(self, hash_method: str = 'md5sum') -> str:
+ def _get_remote_hashpath(self, hash_method: str = "md5sum") -> str:
"""
_get_remote_hashpath Helper to get path of the hash's file to download
@@ -275,16 +303,16 @@ class ObjectDownloader():
str
Remote path of the hash's file to download
"""
- root = self._get_folder_tree()
+ root = self.get_folder_tree()
logger.debug("GET XML content from ARISTA.com")
xpath = f'.//dir[@label="{self.software}"]//file'
return self._parse_xml_for_path(
root_xml=root,
xpath=xpath,
- search_file=f'{self.filename}.{hash_method}',
+ search_file=f"{self.filename}.{hash_method}",
)
- def _get_url(self, remote_file_path: str) -> str:
+ def _get_url(self, remote_file_path: str) -> str:
"""
_get_url Get URL to use for downloading file from Arista server
@@ -302,13 +330,15 @@ class ObjectDownloader():
"""
if self.session_id is None:
self.authenticate()
- jsonpost = {'sessionCode': self.session_id, 'filePath': remote_file_path}
- result = requests.post(ARISTA_DOWNLOAD_URL, data=json.dumps(jsonpost), timeout=self.timeout)
- if 'data' in result.json() and 'url' in result.json()['data']:
+ jsonpost = {"sessionCode": self.session_id, "filePath": remote_file_path}
+ result = requests.post(
+ ARISTA_DOWNLOAD_URL, data=json.dumps(jsonpost), timeout=self.timeout
+ )
+ if "data" in result.json() and "url" in result.json()["data"]:
# logger.debug('URL to download file is: {}', result.json())
return result.json()["data"]["url"]
- logger.critical(f'Server returns following message: {result.json()}')
- return ''
+ logger.critical(f"Server returns following message: {result.json()}")
+ return ""
@staticmethod
def _download_file_raw(url: str, file_path: str) -> str:
@@ -331,31 +361,40 @@ class ObjectDownloader():
"""
chunkSize = 1024
r = requests.get(url, stream=True, timeout=5)
- with open(file_path, 'wb') as f:
- pbar = tqdm(unit="B", total=int(r.headers['Content-Length']), unit_scale=True, unit_divisor=1024)
+ with open(file_path, "wb") as f:
+ pbar = tqdm(
+ unit="B",
+ total=int(r.headers["Content-Length"]),
+ unit_scale=True,
+ unit_divisor=1024,
+ )
for chunk in r.iter_content(chunk_size=chunkSize):
if chunk:
pbar.update(len(chunk))
f.write(chunk)
return file_path
- def _download_file(self, file_path: str, filename: str, rich_interface: bool = True) -> Union[None, str]:
+ def _download_file(
+ self, file_path: str, filename: str, rich_interface: bool = True
+ ) -> Union[None, str]:
remote_file_path = self._get_remote_filepath()
- logger.info(f'File found on arista server: {remote_file_path}')
+ logger.info(f"File found on arista server: {remote_file_path}")
file_url = self._get_url(remote_file_path=remote_file_path)
if file_url is not False:
if not rich_interface:
- return self._download_file_raw(url=file_url, file_path=os.path.join(file_path, filename))
+ return self._download_file_raw(
+ url=file_url, file_path=os.path.join(file_path, filename)
+ )
rich_downloader = DownloadProgressBar()
rich_downloader.download(urls=[file_url], dest_dir=file_path)
return os.path.join(file_path, filename)
- logger.error(f'Cannot download file {file_path}')
+ logger.error(f"Cannot download file {file_path}")
return None
@staticmethod
def _create_destination_folder(path: str) -> None:
# os.makedirs(path, mode, exist_ok=True)
- os.system(f'mkdir -p {path}')
+ os.system(f"mkdir -p {path}")
@staticmethod
def _disable_ztp(file_path: str) -> None:
@@ -379,24 +418,29 @@ class ObjectDownloader():
"""
credentials = (base64.b64encode(self.token.encode())).decode("utf-8")
session_code_url = ARISTA_GET_SESSION
- jsonpost = {'accessToken': credentials}
+ jsonpost = {"accessToken": credentials}
- result = requests.post(session_code_url, data=json.dumps(jsonpost), timeout=self.timeout)
+ result = requests.post(
+ session_code_url, data=json.dumps(jsonpost), timeout=self.timeout
+ )
- if result.json()["status"]["message"] in[ 'Access token expired', 'Invalid access token']:
- console.print(f'āŒ {MSG_TOKEN_EXPIRED}', style="bold red")
+ if result.json()["status"]["message"] in [
+ "Access token expired",
+ "Invalid access token",
+ ]:
+ console.print(f"āŒ {MSG_TOKEN_EXPIRED}", style="bold red")
logger.error(MSG_TOKEN_EXPIRED)
return False
try:
- if 'data' in result.json():
+ if "data" in result.json():
self.session_id = result.json()["data"]["session_code"]
- logger.info('Authenticated on arista.com')
+ logger.info("Authenticated on arista.com")
return True
- logger.debug(f'{result.json()}')
+ logger.debug(f"{result.json()}")
return False
except KeyError as error_arista:
- logger.error(f'Error: {error_arista}')
+ logger.error(f"Error: {error_arista}")
sys.exit(1)
def download_local(self, file_path: str, checksum: bool = False) -> bool:
@@ -422,25 +466,33 @@ class ObjectDownloader():
bool
True if everything went well, False if any problem appears
"""
- file_downloaded = str(self._download_file(file_path=file_path, filename=self.filename))
+ file_downloaded = str(
+ self._download_file(file_path=file_path, filename=self.filename)
+ )
# Check file HASH
hash_result = False
if checksum:
- logger.info('šŸš€ Running checksum validation')
- console.print('šŸš€ Running checksum validation')
- if self.hash_method == 'md5sum':
+ logger.info("šŸš€ Running checksum validation")
+ console.print("šŸš€ Running checksum validation")
+ if self.hash_method == "md5sum":
hash_expected = self._get_hash(file_path=file_path)
- hash_result = self._compute_hash_md5sum(file=file_downloaded, hash_expected=hash_expected)
- elif self.hash_method == 'sha512sum':
+ hash_result = self._compute_hash_md5sum(
+ file=file_downloaded, hash_expected=hash_expected
+ )
+ elif self.hash_method == "sha512sum":
hash_expected = self._get_hash(file_path=file_path)
- hash_result = self._compute_hash_sh512sum(file=file_downloaded, hash_expected=hash_expected)
+ hash_result = self._compute_hash_sh512sum(
+ file=file_downloaded, hash_expected=hash_expected
+ )
if not hash_result:
- logger.error('Downloaded file is corrupted, please check your connection')
- console.print('āŒ Downloaded file is corrupted, please check your connection')
+ logger.error("Downloaded file is corrupted, please check your connection")
+ console.print(
+ "āŒ Downloaded file is corrupted, please check your connection"
+ )
return False
- logger.info('Downloaded file is correct.')
- console.print('āœ… Downloaded file is correct.')
+ logger.info("Downloaded file is correct.")
+ console.print("āœ… Downloaded file is correct.")
return True
def provision_eve(self, noztp: bool = False, checksum: bool = True) -> None:
@@ -466,7 +518,7 @@ class ObjectDownloader():
# Build image name to use in folder path
eos_image_name = self.filename.rstrip(".vmdk").lower()
if noztp:
- eos_image_name = f'{eos_image_name}-noztp'
+ eos_image_name = f"{eos_image_name}-noztp"
# Create full path for EVE-NG
file_path = os.path.join(EVE_QEMU_FOLDER_PATH, eos_image_name.rstrip())
# Create folders in filesystem
@@ -474,20 +526,23 @@ class ObjectDownloader():
# Download file to local destination
file_downloaded = self._download_file(
- file_path=file_path, filename=self.filename)
+ file_path=file_path, filename=self.filename
+ )
# Convert to QCOW2 format
file_qcow2 = os.path.join(file_path, "hda.qcow2")
- logger.info('Converting VMDK to QCOW2 format')
- console.print('šŸš€ Converting VMDK to QCOW2 format...')
+ logger.info("Converting VMDK to QCOW2 format")
+ console.print("šŸš€ Converting VMDK to QCOW2 format...")
- os.system(f'$(which qemu-img) convert -f vmdk -O qcow2 {file_downloaded} {file_qcow2}')
+ os.system(
+ f"$(which qemu-img) convert -f vmdk -O qcow2 {file_downloaded} {file_qcow2}"
+ )
- logger.info('Applying unl_wrapper to fix permissions')
- console.print('Applying unl_wrapper to fix permissions')
+ logger.info("Applying unl_wrapper to fix permissions")
+ console.print("Applying unl_wrapper to fix permissions")
- os.system('/opt/unetlab/wrappers/unl_wrapper -a fixpermissions')
- os.system(f'rm -f {file_downloaded}')
+ os.system("/opt/unetlab/wrappers/unl_wrapper -a fixpermissions")
+ os.system(f"rm -f {file_downloaded}")
if noztp:
self._disable_ztp(file_path=file_path)
@@ -502,12 +557,12 @@ class ObjectDownloader():
version (str):
image_name (str, optional): Image name to use. Defaults to "arista/ceos".
"""
- docker_image = f'{image_name}:{self.version}'
- logger.info(f'Importing image {self.filename} to {docker_image}')
- console.print(f'šŸš€ Importing image {self.filename} to {docker_image}')
- os.system(f'$(which docker) import {self.filename} {docker_image}')
- for filename in glob.glob(f'{self.filename}*'):
+ docker_image = f"{image_name}:{self.version}"
+ logger.info(f"Importing image {self.filename} to {docker_image}")
+ console.print(f"šŸš€ Importing image {self.filename} to {docker_image}")
+ os.system(f"$(which docker) import {self.filename} {docker_image}")
+ for filename in glob.glob(f"{self.filename}*"):
try:
os.remove(filename)
except FileNotFoundError:
- console.print(f'File not found: {filename}')
+ console.print(f"File not found: {filename}")
diff --git a/pylintrc b/pylintrc
index a9fbc4a..dd7dfff 100644
--- a/pylintrc
+++ b/pylintrc
@@ -2,7 +2,8 @@
disable=
invalid-name,
logging-fstring-interpolation,
- fixme
+ fixme,
+ line-too-long
[BASIC]
good-names=runCmds, i, y, t, c, x, e, fd, ip, v
diff --git a/pyproject.toml b/pyproject.toml
index 1fa01f9..f98c4ee 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "eos_downloader"
-version = "v0.8.2"
+version = "v0.9.0"
readme = "README.md"
authors = [{ name = "Thomas Grimonet", email = "thomas.grimonet@gmail.com" }]
maintainers = [
@@ -22,7 +22,7 @@ dependencies = [
"scp",
"tqdm",
"loguru",
- "rich~=13.5.2",
+ "rich>=13.5.2,<13.7.0",
"cvprac>=1.0.7",
"click~=8.1.6",
"click-help-colors~=0.9",
@@ -62,7 +62,7 @@ dev = [
"pytest-html>=3.1.1",
"pytest-metadata>=1.11.0",
"pylint-pydantic>=0.2.4",
- "tox==4.10.0",
+ "tox~=4.11",
"types-PyYAML",
"types-paramiko",
"types-requests",
@@ -94,7 +94,7 @@ namespaces = false
# Version
################################
[tool.bumpver]
-current_version = "0.8.2"
+current_version = "0.9.0"
version_pattern = "MAJOR.MINOR.PATCH"
commit_message = "bump: Version {old_version} -> {new_version}"
commit = true
diff --git a/tests/lib/dataset.py b/tests/lib/dataset.py
index 1286456..34175e9 100644
--- a/tests/lib/dataset.py
+++ b/tests/lib/dataset.py
@@ -5,12 +5,13 @@
# flake8: noqa: W503
# flake8: noqa: W1202
-from __future__ import (absolute_import, division, print_function)
+from __future__ import absolute_import, division, print_function
+
import os
+
import eos_downloader
-from eos_downloader.eos import EOSDownloader
from eos_downloader.data import DATA_MAPPING
-
+from eos_downloader.eos import EOSDownloader
# --------------------------------------------------------------- #
# MOOCK data to use for testing
@@ -18,99 +19,99 @@ from eos_downloader.data import DATA_MAPPING
# Get Auth token
# eos_token = os.getenv('ARISTA_TOKEN')
-eos_token = os.getenv('ARISTA_TOKEN', 'invalid_token')
-eos_token_invalid = 'invalid_token'
+eos_token = os.getenv("ARISTA_TOKEN", "invalid_token")
+eos_token_invalid = "invalid_token"
eos_dataset_valid = [
{
- 'image': 'EOS',
- 'version': '4.26.3M',
- 'software': 'EOS',
- 'filename': 'EOS-4.26.3M.swi',
- 'expected_hash': 'sha512sum',
- 'remote_path': '/support/download/EOS-USA/Active Releases/4.26/EOS-4.26.3M/EOS-4.26.3M.swi',
- 'compute_checksum': True
+ "image": "EOS",
+ "version": "4.26.3M",
+ "software": "EOS",
+ "filename": "EOS-4.26.3M.swi",
+ "expected_hash": "sha512sum",
+ "remote_path": "/support/download/EOS-USA/Active Releases/4.26/EOS-4.26.3M/EOS-4.26.3M.swi",
+ "compute_checksum": True,
},
{
- 'image': 'EOS',
- 'version': '4.25.6M',
- 'software': 'EOS',
- 'filename': 'EOS-4.25.6M.swi',
- 'expected_hash': 'md5sum',
- 'remote_path': '/support/download/EOS-USA/Active Releases/4.25/EOS-4.25.6M/EOS-4.25.6M.swi',
- 'compute_checksum': True
+ "image": "EOS",
+ "version": "4.25.6M",
+ "software": "EOS",
+ "filename": "EOS-4.25.6M.swi",
+ "expected_hash": "md5sum",
+ "remote_path": "/support/download/EOS-USA/Active Releases/4.25/EOS-4.25.6M/EOS-4.25.6M.swi",
+ "compute_checksum": True,
},
{
- 'image': 'vEOS-lab',
- 'version': '4.25.6M',
- 'software': 'EOS',
- 'filename': 'vEOS-lab-4.25.6M.vmdk',
- 'expected_hash': 'md5sum',
- 'remote_path': '/support/download/EOS-USA/Active Releases/4.25/EOS-4.25.6M/vEOS-lab/vEOS-lab-4.25.6M.vmdk',
- 'compute_checksum': False
- }
+ "image": "vEOS-lab",
+ "version": "4.25.6M",
+ "software": "EOS",
+ "filename": "vEOS-lab-4.25.6M.vmdk",
+ "expected_hash": "md5sum",
+ "remote_path": "/support/download/EOS-USA/Active Releases/4.25/EOS-4.25.6M/vEOS-lab/vEOS-lab-4.25.6M.vmdk",
+ "compute_checksum": False,
+ },
]
eos_dataset_invalid = [
{
- 'image': 'default',
- 'version': '4.26.3M',
- 'software': 'EOS',
- 'filename': 'EOS-4.26.3M.swi',
- 'expected_hash': 'sha512sum',
- 'remote_path': '/support/download/EOS-USA/Active Releases/4.26/EOS-4.26.3M/EOS-4.26.3M.swi',
- 'compute_checksum': True
+ "image": "default",
+ "version": "4.26.3M",
+ "software": "EOS",
+ "filename": "EOS-4.26.3M.swi",
+ "expected_hash": "sha512sum",
+ "remote_path": "/support/download/EOS-USA/Active Releases/4.26/EOS-4.26.3M/EOS-4.26.3M.swi",
+ "compute_checksum": True,
}
]
eos_version = [
{
- 'version': 'EOS-4.23.1F',
- 'is_valid': True,
- 'major': 4,
- 'minor': 23,
- 'patch': 1,
- 'rtype': 'F'
+ "version": "EOS-4.23.1F",
+ "is_valid": True,
+ "major": 4,
+ "minor": 23,
+ "patch": 1,
+ "rtype": "F",
},
{
- 'version': 'EOS-4.23.0',
- 'is_valid': True,
- 'major': 4,
- 'minor': 23,
- 'patch': 0,
- 'rtype': None
+ "version": "EOS-4.23.0",
+ "is_valid": True,
+ "major": 4,
+ "minor": 23,
+ "patch": 0,
+ "rtype": None,
},
{
- 'version': 'EOS-4.23',
- 'is_valid': True,
- 'major': 4,
- 'minor': 23,
- 'patch': 0,
- 'rtype': None
+ "version": "EOS-4.23",
+ "is_valid": True,
+ "major": 4,
+ "minor": 23,
+ "patch": 0,
+ "rtype": None,
},
{
- 'version': 'EOS-4.23.1M',
- 'is_valid': True,
- 'major': 4,
- 'minor': 23,
- 'patch': 1,
- 'rtype': 'M'
+ "version": "EOS-4.23.1M",
+ "is_valid": True,
+ "major": 4,
+ "minor": 23,
+ "patch": 1,
+ "rtype": "M",
},
{
- 'version': 'EOS-4.23.1.F',
- 'is_valid': True,
- 'major': 4,
- 'minor': 23,
- 'patch': 1,
- 'rtype': 'F'
+ "version": "EOS-4.23.1.F",
+ "is_valid": True,
+ "major": 4,
+ "minor": 23,
+ "patch": 1,
+ "rtype": "F",
},
{
- 'version': 'EOS-5.23.1F',
- 'is_valid': False,
- 'major': 4,
- 'minor': 23,
- 'patch': 1,
- 'rtype': 'F'
+ "version": "EOS-5.23.1F",
+ "is_valid": False,
+ "major": 4,
+ "minor": 23,
+ "patch": 1,
+ "rtype": "F",
},
-] \ No newline at end of file
+]
diff --git a/tests/lib/fixtures.py b/tests/lib/fixtures.py
index 4515f9b..64b341b 100644
--- a/tests/lib/fixtures.py
+++ b/tests/lib/fixtures.py
@@ -5,13 +5,20 @@
# flake8: noqa: W503
# flake8: noqa: W1202
-from __future__ import (absolute_import, division, print_function)
+from __future__ import absolute_import, division, print_function
+
import os
+from typing import Any, Dict, List
+
import pytest
-import eos_downloader
-from typing import Dict, Any, List
-from tests.lib.dataset import eos_dataset_valid, eos_dataset_invalid, eos_token, eos_token_invalid
+import eos_downloader
+from tests.lib.dataset import (
+ eos_dataset_invalid,
+ eos_dataset_valid,
+ eos_token,
+ eos_token_invalid,
+)
@pytest.fixture
@@ -19,17 +26,18 @@ from tests.lib.dataset import eos_dataset_valid, eos_dataset_invalid, eos_token,
def create_download_instance(request, DOWNLOAD_INFO):
# logger.info("Execute fixture to create class elements")
request.cls.eos_downloader = eos_downloader.eos.EOSDownloader(
- image=DOWNLOAD_INFO['image'],
- software=DOWNLOAD_INFO['software'],
- version=DOWNLOAD_INFO['version'],
- token=eos_token,
- hash_method='sha512sum')
+ image=DOWNLOAD_INFO["image"],
+ software=DOWNLOAD_INFO["software"],
+ version=DOWNLOAD_INFO["version"],
+ token=eos_token,
+ hash_method="sha512sum",
+ )
yield
# logger.info('Cleanup test environment')
- os.system('rm -f {}*'.format(DOWNLOAD_INFO['filename']))
+ os.system("rm -f {}*".format(DOWNLOAD_INFO["filename"]))
-def generate_test_ids_dict(val: Dict[str, Any], key: str = 'name') -> str:
+def generate_test_ids_dict(val: Dict[str, Any], key: str = "name") -> str:
"""
generate_test_ids Helper to generate test ID for parametrize
@@ -50,7 +58,8 @@ def generate_test_ids_dict(val: Dict[str, Any], key: str = 'name') -> str:
return val[key]
return "undefined_test"
-def generate_test_ids_list(val: List[Dict[str, Any]], key: str = 'name') -> str:
+
+def generate_test_ids_list(val: List[Dict[str, Any]], key: str = "name") -> str:
"""
generate_test_ids Helper to generate test ID for parametrize
@@ -66,4 +75,4 @@ def generate_test_ids_list(val: List[Dict[str, Any]], key: str = 'name') -> str:
str
Name of the configlet
"""
- return [ entry[key] if key in entry.keys() else 'unset_entry' for entry in val ]
+ return [entry[key] if key in entry.keys() else "unset_entry" for entry in val]
diff --git a/tests/lib/helpers.py b/tests/lib/helpers.py
index 308f2a5..67e914b 100644
--- a/tests/lib/helpers.py
+++ b/tests/lib/helpers.py
@@ -5,14 +5,13 @@
# flake8: noqa: W503
# flake8: noqa: W1202
-from __future__ import (absolute_import, division, print_function)
+from __future__ import absolute_import, division, print_function
import os
from eos_downloader.data import DATA_MAPPING
-
def default_filename(version: str, info):
"""
default_filename Helper to build default filename
@@ -31,10 +30,14 @@ def default_filename(version: str, info):
"""
if version is None or info is None:
return None
- return DATA_MAPPING[info['software']]['default']['prepend'] + '-' + version + '.swi'
+ return DATA_MAPPING[info["software"]]["default"]["prepend"] + "-" + version + ".swi"
def is_on_github_actions():
"""Check if code is running on a CI runner"""
- if "CI" not in os.environ or not os.environ["CI"] or "GITHUB_RUN_ID" not in os.environ:
- return False \ No newline at end of file
+ if (
+ "CI" not in os.environ
+ or not os.environ["CI"]
+ or "GITHUB_RUN_ID" not in os.environ
+ ):
+ return False
diff --git a/tests/system/test_eos_download.py.old b/tests/system/test_eos_download.py.old
index 6ae56fe..91e60a5 100644
--- a/tests/system/test_eos_download.py.old
+++ b/tests/system/test_eos_download.py.old
@@ -45,4 +45,3 @@ class TestEosDownload_valid():
@pytest.mark.eos_download
def test_download_local(self, DOWNLOAD_INFO):
self.eos_downloader.download_local(file_path='.', checksum=DOWNLOAD_INFO['compute_checksum'])
-
diff --git a/tests/unit/test_eos_version.py b/tests/unit/test_eos_version.py
index 1b97ffc..82f1269 100644
--- a/tests/unit/test_eos_version.py
+++ b/tests/unit/test_eos_version.py
@@ -5,126 +5,166 @@
# flake8: noqa: W503
# flake8: noqa: W1202
-from __future__ import (absolute_import, division, print_function)
+from __future__ import absolute_import, division, print_function
import sys
-from loguru import logger
+
import pytest
-from eos_downloader.models.version import EosVersion, BASE_VERSION_STR
+from loguru import logger
+
+from eos_downloader.models.version import BASE_VERSION_STR, EosVersion
from tests.lib.dataset import eos_version
from tests.lib.fixtures import generate_test_ids_list
logger.remove()
logger.add(sys.stderr, level="DEBUG")
-@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
+
+@pytest.mark.parametrize(
+ "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
+)
def test_eos_version_from_str(EOS_VERSION):
- version = EosVersion.from_str(EOS_VERSION['version'])
- if EOS_VERSION['is_valid']:
- assert version.major == EOS_VERSION['major']
- assert version.minor == EOS_VERSION['minor']
- assert version.patch == EOS_VERSION['patch']
- assert version.rtype == EOS_VERSION['rtype']
+ version = EosVersion.from_str(EOS_VERSION["version"])
+ if EOS_VERSION["is_valid"]:
+ assert version.major == EOS_VERSION["major"]
+ assert version.minor == EOS_VERSION["minor"]
+ assert version.patch == EOS_VERSION["patch"]
+ assert version.rtype == EOS_VERSION["rtype"]
else:
assert str(version) == BASE_VERSION_STR
-@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
+@pytest.mark.parametrize(
+ "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
+)
def test_eos_version_to_str(EOS_VERSION):
version = EosVersion(**EOS_VERSION)
- if EOS_VERSION['is_valid']:
- assert version.major == EOS_VERSION['major']
- assert version.minor == EOS_VERSION['minor']
- assert version.patch == EOS_VERSION['patch']
- assert version.rtype == EOS_VERSION['rtype']
+ if EOS_VERSION["is_valid"]:
+ assert version.major == EOS_VERSION["major"]
+ assert version.minor == EOS_VERSION["minor"]
+ assert version.patch == EOS_VERSION["patch"]
+ assert version.rtype == EOS_VERSION["rtype"]
+
-@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
+@pytest.mark.parametrize(
+ "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
+)
def test_eos_version_branch(EOS_VERSION):
- if EOS_VERSION['is_valid']:
+ if EOS_VERSION["is_valid"]:
version = EosVersion(**EOS_VERSION)
assert version.branch == f'{EOS_VERSION["major"]}.{EOS_VERSION["minor"]}'
-@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
+
+@pytest.mark.parametrize(
+ "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
+)
def test_eos_version_eq_operator(EOS_VERSION):
- if not EOS_VERSION['is_valid']:
- pytest.skip('not a valid version to test')
+ if not EOS_VERSION["is_valid"]:
+ pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION)
- logger.warning(f'version is: {version.dict()}')
+ logger.warning(f"version is: {version.dict()}")
assert version == version
-@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
+
+@pytest.mark.parametrize(
+ "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
+)
def test_eos_version_ge_operator(EOS_VERSION):
- if not EOS_VERSION['is_valid']:
- pytest.skip('not a valid version to test')
+ if not EOS_VERSION["is_valid"]:
+ pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION)
version_b = EosVersion.from_str(BASE_VERSION_STR)
assert version >= version_b
-@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
+
+@pytest.mark.parametrize(
+ "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
+)
def test_eos_version_gs_operator(EOS_VERSION):
- if not EOS_VERSION['is_valid']:
- pytest.skip('not a valid version to test')
+ if not EOS_VERSION["is_valid"]:
+ pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION)
version_b = EosVersion.from_str(BASE_VERSION_STR)
assert version > version_b
-@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
+
+@pytest.mark.parametrize(
+ "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
+)
def test_eos_version_le_operator(EOS_VERSION):
- if not EOS_VERSION['is_valid']:
- pytest.skip('not a valid version to test')
+ if not EOS_VERSION["is_valid"]:
+ pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION)
version_b = EosVersion.from_str(BASE_VERSION_STR)
assert version_b <= version
-@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
+
+@pytest.mark.parametrize(
+ "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
+)
def test_eos_version_ls_operator(EOS_VERSION):
- if not EOS_VERSION['is_valid']:
- pytest.skip('not a valid version to test')
+ if not EOS_VERSION["is_valid"]:
+ pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION)
version_b = EosVersion.from_str(BASE_VERSION_STR)
assert version_b < version
-@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
+
+@pytest.mark.parametrize(
+ "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
+)
def test_eos_version_ne_operator(EOS_VERSION):
- if not EOS_VERSION['is_valid']:
- pytest.skip('not a valid version to test')
+ if not EOS_VERSION["is_valid"]:
+ pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION)
version_b = EosVersion.from_str(BASE_VERSION_STR)
assert version_b != version
-@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
+
+@pytest.mark.parametrize(
+ "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
+)
def test_eos_version_match(EOS_VERSION):
- if not EOS_VERSION['is_valid']:
- pytest.skip('not a valid version to test')
+ if not EOS_VERSION["is_valid"]:
+ pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION)
assert version.match(f'=={EOS_VERSION["version"]}')
- assert version.match(f'!={BASE_VERSION_STR}')
- assert version.match(f'>={BASE_VERSION_STR}')
- assert version.match(f'>{BASE_VERSION_STR}')
- assert version.match('<=4.99.0F')
- assert version.match('<4.99.0F')
+ assert version.match(f"!={BASE_VERSION_STR}")
+ assert version.match(f">={BASE_VERSION_STR}")
+ assert version.match(f">{BASE_VERSION_STR}")
+ assert version.match("<=4.99.0F")
+ assert version.match("<4.99.0F")
+
-@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
+@pytest.mark.parametrize(
+ "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
+)
def test_eos_version_is_in_branch(EOS_VERSION):
- if not EOS_VERSION['is_valid']:
- pytest.skip('not a valid version to test')
+ if not EOS_VERSION["is_valid"]:
+ pytest.skip("not a valid version to test")
version = EosVersion(**EOS_VERSION)
assert version.is_in_branch(f"{EOS_VERSION['major']}.{EOS_VERSION['minor']}")
-@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
+
+@pytest.mark.parametrize(
+ "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
+)
def test_eos_version_match_exception(EOS_VERSION):
- if not EOS_VERSION['is_valid']:
- pytest.skip('not a valid version to test')
+ if not EOS_VERSION["is_valid"]:
+ pytest.skip("not a valid version to test")
with pytest.raises(Exception) as e_info:
version = EosVersion(**EOS_VERSION)
assert version.match(f'+={EOS_VERSION["version"]}')
- logger.info(f'receive exception: {e_info}')
+ logger.info(f"receive exception: {e_info}")
+
-@pytest.mark.parametrize("EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version,key='version'))
+@pytest.mark.parametrize(
+ "EOS_VERSION", eos_version, ids=generate_test_ids_list(eos_version, key="version")
+)
def test_eos_version_compare_exception(EOS_VERSION):
- if not EOS_VERSION['is_valid']:
- pytest.skip('not a valid version to test')
+ if not EOS_VERSION["is_valid"]:
+ pytest.skip("not a valid version to test")
with pytest.raises(Exception) as e_info:
version = EosVersion(**EOS_VERSION)
version._compare(BASE_VERSION_STR)
- logger.info(f'receive exception: {e_info}')
+ logger.info(f"receive exception: {e_info}")
diff --git a/tests/unit/test_object_downloader.py b/tests/unit/test_object_downloader.py
index 8237b1c..d178dfc 100644
--- a/tests/unit/test_object_downloader.py
+++ b/tests/unit/test_object_downloader.py
@@ -14,30 +14,41 @@ from loguru import logger
import eos_downloader
from eos_downloader.data import DATA_MAPPING
from eos_downloader.eos import EOSDownloader
-from tests.lib.dataset import eos_dataset_invalid, eos_dataset_valid, eos_token, eos_token_invalid
+from tests.lib.dataset import (
+ eos_dataset_invalid,
+ eos_dataset_valid,
+ eos_token,
+ eos_token_invalid,
+)
from tests.lib.fixtures import create_download_instance
from tests.lib.helpers import default_filename, is_on_github_actions
logger.remove()
logger.add(sys.stderr, level="DEBUG")
+
@pytest.mark.usefixtures("create_download_instance")
-@pytest.mark.parametrize("DOWNLOAD_INFO", eos_dataset_valid, ids=['EOS-sha512', 'EOS-md5' ,'vEOS-lab-no-hash'])
+@pytest.mark.parametrize(
+ "DOWNLOAD_INFO",
+ eos_dataset_valid,
+ ids=["EOS-sha512", "EOS-md5", "vEOS-lab-no-hash"],
+)
@pytest.mark.eos_download
-class TestEosDownload_valid():
+class TestEosDownload_valid:
def test_data(self, DOWNLOAD_INFO):
- logger.info(f'test input: {DOWNLOAD_INFO}')
- logger.info(f'test build: {self.eos_downloader.__dict__}')
+ logger.info(f"test input: {DOWNLOAD_INFO}")
+ logger.info(f"test build: {self.eos_downloader.__dict__}")
def test_eos_download_create(self, DOWNLOAD_INFO):
my_download = eos_downloader.eos.EOSDownloader(
- image=DOWNLOAD_INFO['image'],
- software=DOWNLOAD_INFO['software'],
- version=DOWNLOAD_INFO['version'],
+ image=DOWNLOAD_INFO["image"],
+ software=DOWNLOAD_INFO["software"],
+ version=DOWNLOAD_INFO["version"],
token=eos_token,
- hash_method='sha512sum')
+ hash_method="sha512sum",
+ )
logger.info(my_download)
- assert isinstance(my_download, eos_downloader.eos.EOSDownloader)
+ assert isinstance(my_download, eos_downloader.eos.EOSDownloader)
def test_eos_download_repr_string(self, DOWNLOAD_INFO):
expected = f"{DOWNLOAD_INFO['software']} - {DOWNLOAD_INFO['image']} - {DOWNLOAD_INFO['version']}"
@@ -45,47 +56,56 @@ class TestEosDownload_valid():
assert str(self.eos_downloader) == expected
def test_eos_download_build_filename(self, DOWNLOAD_INFO):
- assert self.eos_downloader._build_filename() == DOWNLOAD_INFO['filename']
+ assert self.eos_downloader._build_filename() == DOWNLOAD_INFO["filename"]
- @pytest.mark.dependency(name='authentication')
- @pytest.mark.skipif(eos_token == eos_token_invalid, reason="Token is not set correctly")
+ @pytest.mark.dependency(name="authentication")
+ @pytest.mark.skipif(
+ eos_token == eos_token_invalid, reason="Token is not set correctly"
+ )
@pytest.mark.skipif(is_on_github_actions(), reason="Running on Github Runner")
# @pytest.mark.xfail(reason="Deliberate - CI not set for testing AUTH")
@pytest.mark.webtest
def test_eos_download_authenticate(self):
assert self.eos_downloader.authenticate() is True
- @pytest.mark.dependency(depends=["authentication"], scope='class')
+ @pytest.mark.dependency(depends=["authentication"], scope="class")
@pytest.mark.webtest
def test_eos_download_get_remote_file_path(self, DOWNLOAD_INFO):
- assert self.eos_downloader._get_remote_filepath() == DOWNLOAD_INFO['remote_path']
+ assert (
+ self.eos_downloader._get_remote_filepath() == DOWNLOAD_INFO["remote_path"]
+ )
- @pytest.mark.dependency(depends=["authentication"], scope='class')
+ @pytest.mark.dependency(depends=["authentication"], scope="class")
@pytest.mark.webtest
def test_eos_download_get_file_url(self, DOWNLOAD_INFO):
- url = self.eos_downloader._get_url(remote_file_path = DOWNLOAD_INFO['remote_path'])
+ url = self.eos_downloader._get_url(
+ remote_file_path=DOWNLOAD_INFO["remote_path"]
+ )
logger.info(url)
- assert 'https://downloads.arista.com/EOS-USA/Active%20Releases/' in url
+ assert "https://downloads.arista.com/EOS-USA/Active%20Releases/" in url
-@pytest.mark.usefixtures("create_download_instance")
-@pytest.mark.parametrize("DOWNLOAD_INFO", eos_dataset_invalid, ids=['EOS-FAKE'])
-class TestEosDownload_invalid():
+@pytest.mark.usefixtures("create_download_instance")
+@pytest.mark.parametrize("DOWNLOAD_INFO", eos_dataset_invalid, ids=["EOS-FAKE"])
+class TestEosDownload_invalid:
def test_data(self, DOWNLOAD_INFO):
- logger.info(f'test input: {dict(DOWNLOAD_INFO)}')
- logger.info(f'test build: {self.eos_downloader.__dict__}')
+ logger.info(f"test input: {dict(DOWNLOAD_INFO)}")
+ logger.info(f"test build: {self.eos_downloader.__dict__}")
def test_eos_download_login_error(self, DOWNLOAD_INFO):
my_download = eos_downloader.eos.EOSDownloader(
- image=DOWNLOAD_INFO['image'],
- software=DOWNLOAD_INFO['software'],
- version=DOWNLOAD_INFO['version'],
+ image=DOWNLOAD_INFO["image"],
+ software=DOWNLOAD_INFO["software"],
+ version=DOWNLOAD_INFO["version"],
token=eos_token_invalid,
- hash_method=DOWNLOAD_INFO['expected_hash'])
+ hash_method=DOWNLOAD_INFO["expected_hash"],
+ )
assert my_download.authenticate() is False
- @pytest.mark.dependency(name='authentication')
- @pytest.mark.skipif(eos_token == eos_token_invalid, reason="Token is not set correctly")
+ @pytest.mark.dependency(name="authentication")
+ @pytest.mark.skipif(
+ eos_token == eos_token_invalid, reason="Token is not set correctly"
+ )
@pytest.mark.skipif(is_on_github_actions(), reason="Running on Github Runner")
# @pytest.mark.xfail(reason="Deliberate - CI not set for testing AUTH")
@pytest.mark.webtest
@@ -96,46 +116,48 @@ class TestEosDownload_invalid():
# @pytest.mark.skip(reason="Not yet implemented in lib")
def test_eos_file_name_with_incorrect_software(self, DOWNLOAD_INFO):
- self.eos_downloader.software = 'FAKE'
- logger.info(f'test build: {self.eos_downloader.__dict__}')
+ self.eos_downloader.software = "FAKE"
+ logger.info(f"test build: {self.eos_downloader.__dict__}")
with pytest.raises(ValueError) as e_info:
result = self.eos_downloader._build_filename()
- logger.info(f'receive exception: {e_info}')
- self.eos_downloader.software = DOWNLOAD_INFO['software']
+ logger.info(f"receive exception: {e_info}")
+ self.eos_downloader.software = DOWNLOAD_INFO["software"]
@pytest.mark.webtest
- @pytest.mark.dependency(depends=["authentication"], scope='class')
- def test_eos_download_get_remote_file_path_for_invlaid_software(self, DOWNLOAD_INFO):
- self.eos_downloader.software = 'FAKE'
- logger.info(f'Platform set to: {self.eos_downloader.software}')
- logger.info(f'test build: {self.eos_downloader.__dict__}')
+ @pytest.mark.dependency(depends=["authentication"], scope="class")
+ def test_eos_download_get_remote_file_path_for_invlaid_software(
+ self, DOWNLOAD_INFO
+ ):
+ self.eos_downloader.software = "FAKE"
+ logger.info(f"Platform set to: {self.eos_downloader.software}")
+ logger.info(f"test build: {self.eos_downloader.__dict__}")
with pytest.raises(ValueError) as e_info:
result = self.eos_downloader._build_filename()
- logger.info(f'receive exception: {e_info}')
- self.eos_downloader.software = DOWNLOAD_INFO['software']
+ logger.info(f"receive exception: {e_info}")
+ self.eos_downloader.software = DOWNLOAD_INFO["software"]
# IMAGE TESTING
def test_eos_file_name_with_incorrect_image(self, DOWNLOAD_INFO):
- self.eos_downloader.image = 'FAKE'
- logger.info(f'Image set to: {self.eos_downloader.image}')
- assert DOWNLOAD_INFO['filename'] == self.eos_downloader._build_filename()
- self.eos_downloader.software == DOWNLOAD_INFO['image']
+ self.eos_downloader.image = "FAKE"
+ logger.info(f"Image set to: {self.eos_downloader.image}")
+ assert DOWNLOAD_INFO["filename"] == self.eos_downloader._build_filename()
+ self.eos_downloader.software == DOWNLOAD_INFO["image"]
@pytest.mark.webtest
- @pytest.mark.dependency(depends=["authentication"], scope='class')
+ @pytest.mark.dependency(depends=["authentication"], scope="class")
def test_eos_download_get_remote_file_path_for_invlaid_image(self, DOWNLOAD_INFO):
- self.eos_downloader.image = 'FAKE'
- logger.info(f'Image set to: {self.eos_downloader.image}')
+ self.eos_downloader.image = "FAKE"
+ logger.info(f"Image set to: {self.eos_downloader.image}")
assert self.eos_downloader.authenticate() is True
- assert DOWNLOAD_INFO['filename'] == self.eos_downloader._build_filename()
- self.eos_downloader.image = DOWNLOAD_INFO['image']
+ assert DOWNLOAD_INFO["filename"] == self.eos_downloader._build_filename()
+ self.eos_downloader.image = DOWNLOAD_INFO["image"]
# VERSION TESTING
@pytest.mark.webtest
- @pytest.mark.dependency(depends=["authentication"], scope='class')
+ @pytest.mark.dependency(depends=["authentication"], scope="class")
def test_eos_download_get_remote_file_path_for_invlaid_version(self, DOWNLOAD_INFO):
- self.eos_downloader.version = 'FAKE'
- logger.info(f'Version set to: {self.eos_downloader.version}')
- assert self.eos_downloader._get_remote_filepath() == '' \ No newline at end of file
+ self.eos_downloader.version = "FAKE"
+ logger.info(f"Version set to: {self.eos_downloader.version}")
+ assert self.eos_downloader._get_remote_filepath() == ""