summaryrefslogtreecommitdiffstats
path: root/anta/cli/get/utils.py
blob: 179da0cb0b753aa2805d87d08fa14187dcc39d37 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# Copyright (c) 2023-2024 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.
"""
Utils functions to use with anta.cli.get.commands module.
"""
from __future__ import annotations

import functools
import json
import logging
from pathlib import Path
from sys import stdin
from typing import Any

import click
import requests
import urllib3
import yaml

from anta.cli.utils import ExitCode
from anta.inventory import AntaInventory
from anta.inventory.models import AntaInventoryHost, AntaInventoryInput

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

logger = logging.getLogger(__name__)


def inventory_output_options(f: Any) -> Any:
    """Click common options required when an inventory is being generated"""

    @click.option(
        "--output",
        "-o",
        required=True,
        envvar="ANTA_INVENTORY",
        show_envvar=True,
        help="Path to save inventory file",
        type=click.Path(file_okay=True, dir_okay=False, exists=False, writable=True, path_type=Path),
    )
    @click.option(
        "--overwrite",
        help="Do not prompt when overriding current inventory",
        default=False,
        is_flag=True,
        show_default=True,
        required=False,
        show_envvar=True,
    )
    @click.pass_context
    @functools.wraps(f)
    def wrapper(ctx: click.Context, *args: tuple[Any], output: Path, overwrite: bool, **kwargs: dict[str, Any]) -> Any:
        # Boolean to check if the file is empty
        output_is_not_empty = output.exists() and output.stat().st_size != 0
        # Check overwrite when file is not empty
        if not overwrite and output_is_not_empty:
            is_tty = stdin.isatty()
            if is_tty:
                # File has content and it is in an interactive TTY --> Prompt user
                click.confirm(f"Your destination file '{output}' is not empty, continue?", abort=True)
            else:
                # File has content and it is not interactive TTY nor overwrite set to True --> execution stop
                logger.critical("Conversion aborted since destination file is not empty (not running in interactive TTY)")
                ctx.exit(ExitCode.USAGE_ERROR)
        output.parent.mkdir(parents=True, exist_ok=True)
        return f(*args, output=output, **kwargs)

    return wrapper


def get_cv_token(cvp_ip: str, cvp_username: str, cvp_password: str) -> str:
    """Generate AUTH token from CVP using password"""
    # TODO, need to handle requests eror

    # use CVP REST API to generate a token
    URL = f"https://{cvp_ip}/cvpservice/login/authenticate.do"
    payload = json.dumps({"userId": cvp_username, "password": cvp_password})
    headers = {"Content-Type": "application/json", "Accept": "application/json"}

    response = requests.request("POST", URL, headers=headers, data=payload, verify=False, timeout=10)
    return response.json()["sessionId"]


def write_inventory_to_file(hosts: list[AntaInventoryHost], output: Path) -> None:
    """Write a file inventory from pydantic models"""
    i = AntaInventoryInput(hosts=hosts)
    with open(output, "w", encoding="UTF-8") as out_fd:
        out_fd.write(yaml.dump({AntaInventory.INVENTORY_ROOT_KEY: i.model_dump(exclude_unset=True)}))
    logger.info(f"ANTA inventory file has been created: '{output}'")


def create_inventory_from_cvp(inv: list[dict[str, Any]], output: Path) -> None:
    """
    Create an inventory file from Arista CloudVision inventory
    """
    logger.debug(f"Received {len(inv)} device(s) from CloudVision")
    hosts = []
    for dev in inv:
        logger.info(f"   * adding entry for {dev['hostname']}")
        hosts.append(AntaInventoryHost(name=dev["hostname"], host=dev["ipAddress"], tags=[dev["containerName"].lower()]))
    write_inventory_to_file(hosts, output)


def create_inventory_from_ansible(inventory: Path, output: Path, ansible_group: str = "all") -> None:
    """
    Create an ANTA inventory from an Ansible inventory YAML file

    Args:
        inventory: Ansible Inventory file to read
        output: ANTA inventory file to generate.
        ansible_group: Ansible group from where to extract data.
    """

    def find_ansible_group(data: dict[str, Any], group: str) -> dict[str, Any] | None:
        for k, v in data.items():
            if isinstance(v, dict):
                if k == group and ("children" in v.keys() or "hosts" in v.keys()):
                    return v
                d = find_ansible_group(v, group)
                if d is not None:
                    return d
        return None

    def deep_yaml_parsing(data: dict[str, Any], hosts: list[AntaInventoryHost] | None = None) -> list[AntaInventoryHost]:
        """Deep parsing of YAML file to extract hosts and associated IPs"""
        if hosts is None:
            hosts = []
        for key, value in data.items():
            if isinstance(value, dict) and "ansible_host" in value.keys():
                logger.info(f"   * adding entry for {key}")
                hosts.append(AntaInventoryHost(name=key, host=value["ansible_host"]))
            elif isinstance(value, dict):
                deep_yaml_parsing(value, hosts)
            else:
                return hosts
        return hosts

    try:
        with open(inventory, encoding="utf-8") as inv:
            ansible_inventory = yaml.safe_load(inv)
    except OSError as exc:
        raise ValueError(f"Could not parse {inventory}.") from exc

    if not ansible_inventory:
        raise ValueError(f"Ansible inventory {inventory} is empty")

    ansible_inventory = find_ansible_group(ansible_inventory, ansible_group)

    if ansible_inventory is None:
        raise ValueError(f"Group {ansible_group} not found in Ansible inventory")
    ansible_hosts = deep_yaml_parsing(ansible_inventory)
    write_inventory_to_file(ansible_hosts, output)