summaryrefslogtreecommitdiffstats
path: root/anta/cli/exec/utils.py
blob: 758072c7ef9a4cae33ae1af4f19da91a184cebe5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# Copyright (c) 2023-2024 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.

"""Exec CLI helpers."""

from __future__ import annotations

import asyncio
import itertools
import json
import logging
import re
from pathlib import Path
from typing import TYPE_CHECKING, Literal

from aioeapi import EapiCommandError
from click.exceptions import UsageError
from httpx import ConnectError, HTTPError

from anta.device import AntaDevice, AsyncEOSDevice
from anta.models import AntaCommand

if TYPE_CHECKING:
    from anta.inventory import AntaInventory

EOS_SCHEDULED_TECH_SUPPORT = "/mnt/flash/schedule/tech-support"
INVALID_CHAR = "`~!@#$/"
logger = logging.getLogger(__name__)


async def clear_counters_utils(anta_inventory: AntaInventory, tags: set[str] | None = None) -> None:
    """Clear counters."""

    async def clear(dev: AntaDevice) -> None:
        commands = [AntaCommand(command="clear counters")]
        if dev.hw_model not in ["cEOSLab", "vEOS-lab"]:
            commands.append(AntaCommand(command="clear hardware counter drop"))
        await dev.collect_commands(commands=commands)
        for command in commands:
            if not command.collected:
                logger.error("Could not clear counters on device %s: %s", dev.name, command.errors)
        logger.info("Cleared counters on %s (%s)", dev.name, dev.hw_model)

    logger.info("Connecting to devices...")
    await anta_inventory.connect_inventory()
    devices = anta_inventory.get_inventory(established_only=True, tags=tags).devices
    logger.info("Clearing counters on remote devices...")
    await asyncio.gather(*(clear(device) for device in devices))


async def collect_commands(
    inv: AntaInventory,
    commands: dict[str, str],
    root_dir: Path,
    tags: set[str] | None = None,
) -> None:
    """Collect EOS commands."""

    async def collect(dev: AntaDevice, command: str, outformat: Literal["json", "text"]) -> None:
        outdir = Path() / root_dir / dev.name / outformat
        outdir.mkdir(parents=True, exist_ok=True)
        safe_command = re.sub(r"(/|\|$)", "_", command)
        c = AntaCommand(command=command, ofmt=outformat)
        await dev.collect(c)
        if not c.collected:
            logger.error("Could not collect commands on device %s: %s", dev.name, c.errors)
            return
        if c.ofmt == "json":
            outfile = outdir / f"{safe_command}.json"
            content = json.dumps(c.json_output, indent=2)
        elif c.ofmt == "text":
            outfile = outdir / f"{safe_command}.log"
            content = c.text_output
        with outfile.open(mode="w", encoding="UTF-8") as f:
            f.write(content)
        logger.info("Collected command '%s' from device %s (%s)", command, dev.name, dev.hw_model)

    logger.info("Connecting to devices...")
    await inv.connect_inventory()
    devices = inv.get_inventory(established_only=True, tags=tags).devices
    logger.info("Collecting commands from remote devices")
    coros = []
    if "json_format" in commands:
        coros += [collect(device, command, "json") for device, command in itertools.product(devices, commands["json_format"])]
    if "text_format" in commands:
        coros += [collect(device, command, "text") for device, command in itertools.product(devices, commands["text_format"])]
    res = await asyncio.gather(*coros, return_exceptions=True)
    for r in res:
        if isinstance(r, Exception):
            logger.error("Error when collecting commands: %s", str(r))


async def collect_scheduled_show_tech(inv: AntaInventory, root_dir: Path, *, configure: bool, tags: set[str] | None = None, latest: int | None = None) -> None:
    """Collect scheduled show-tech on devices."""

    async def collect(device: AntaDevice) -> None:
        """Collect all the tech-support files stored on Arista switches flash and copy them locally."""
        try:
            # Get the tech-support filename to retrieve
            cmd = f"bash timeout 10 ls -1t {EOS_SCHEDULED_TECH_SUPPORT}"
            if latest:
                cmd += f" | head -{latest}"
            command = AntaCommand(command=cmd, ofmt="text")
            await device.collect(command=command)
            if command.collected and command.text_output:
                filenames = [Path(f"{EOS_SCHEDULED_TECH_SUPPORT}/{f}") for f in command.text_output.splitlines()]
            else:
                logger.error("Unable to get tech-support filenames on %s: verify that %s is not empty", device.name, EOS_SCHEDULED_TECH_SUPPORT)
                return

            # Create directories
            outdir = Path() / root_dir / f"{device.name.lower()}"
            outdir.mkdir(parents=True, exist_ok=True)

            # Check if 'aaa authorization exec default local' is present in the running-config
            command = AntaCommand(command="show running-config | include aaa authorization exec default", ofmt="text")
            await device.collect(command=command)

            if command.collected and not command.text_output:
                logger.debug("'aaa authorization exec default local' is not configured on device %s", device.name)
                if configure:
                    commands = []
                    # TODO: @mtache - add `config` field to `AntaCommand` object to handle this use case.
                    # Otherwise mypy complains about enable as it is only implemented for AsyncEOSDevice
                    # TODO: Should enable be also included in AntaDevice?
                    if not isinstance(device, AsyncEOSDevice):
                        msg = "anta exec collect-tech-support is only supported with AsyncEOSDevice for now."
                        raise UsageError(msg)
                    if device.enable and device._enable_password is not None:  # pylint: disable=protected-access
                        commands.append({"cmd": "enable", "input": device._enable_password})  # pylint: disable=protected-access
                    elif device.enable:
                        commands.append({"cmd": "enable"})
                    commands.extend(
                        [
                            {"cmd": "configure terminal"},
                            {"cmd": "aaa authorization exec default local"},
                        ],
                    )
                    logger.warning("Configuring 'aaa authorization exec default local' on device %s", device.name)
                    command = AntaCommand(command="show running-config | include aaa authorization exec default local", ofmt="text")
                    await device._session.cli(commands=commands)  # pylint: disable=protected-access
                    logger.info("Configured 'aaa authorization exec default local' on device %s", device.name)
                else:
                    logger.error("Unable to collect tech-support on %s: configuration 'aaa authorization exec default local' is not present", device.name)
                    return
            logger.debug("'aaa authorization exec default local' is already configured on device %s", device.name)

            await device.copy(sources=filenames, destination=outdir, direction="from")
            logger.info("Collected %s scheduled tech-support from %s", len(filenames), device.name)

        except (EapiCommandError, HTTPError, ConnectError) as e:
            logger.error("Unable to collect tech-support on %s: %s", device.name, str(e))

    logger.info("Connecting to devices...")
    await inv.connect_inventory()
    devices = inv.get_inventory(established_only=True, tags=tags).devices
    await asyncio.gather(*(collect(device) for device in devices))