summaryrefslogtreecommitdiffstats
path: root/asynceapi
diff options
context:
space:
mode:
Diffstat (limited to 'asynceapi')
-rw-r--r--asynceapi/__init__.py12
-rw-r--r--asynceapi/aio_portcheck.py58
-rw-r--r--asynceapi/config_session.py289
-rw-r--r--asynceapi/device.py291
-rw-r--r--asynceapi/errors.py42
5 files changed, 692 insertions, 0 deletions
diff --git a/asynceapi/__init__.py b/asynceapi/__init__.py
new file mode 100644
index 0000000..d6586cf
--- /dev/null
+++ b/asynceapi/__init__.py
@@ -0,0 +1,12 @@
+# Copyright (c) 2024 Arista Networks, Inc.
+# Use of this source code is governed by the Apache License 2.0
+# that can be found in the LICENSE file.
+# Initially written by Jeremy Schulman at https://github.com/jeremyschulman/aio-eapi
+
+"""Arista EOS eAPI asyncio client."""
+
+from .config_session import SessionConfig
+from .device import Device
+from .errors import EapiCommandError
+
+__all__ = ["Device", "SessionConfig", "EapiCommandError"]
diff --git a/asynceapi/aio_portcheck.py b/asynceapi/aio_portcheck.py
new file mode 100644
index 0000000..79f4562
--- /dev/null
+++ b/asynceapi/aio_portcheck.py
@@ -0,0 +1,58 @@
+# Copyright (c) 2024 Arista Networks, Inc.
+# Use of this source code is governed by the Apache License 2.0
+# that can be found in the LICENSE file.
+# Initially written by Jeremy Schulman at https://github.com/jeremyschulman/aio-eapi
+"""Utility function to check if a port is open."""
+# -----------------------------------------------------------------------------
+# System Imports
+# -----------------------------------------------------------------------------
+
+from __future__ import annotations
+
+import asyncio
+import socket
+from typing import TYPE_CHECKING
+
+# -----------------------------------------------------------------------------
+# Public Imports
+# -----------------------------------------------------------------------------
+
+if TYPE_CHECKING:
+ from httpx import URL
+
+# -----------------------------------------------------------------------------
+# Exports
+# -----------------------------------------------------------------------------
+
+__all__ = ["port_check_url"]
+
+# -----------------------------------------------------------------------------
+#
+# CODE BEGINS
+#
+# -----------------------------------------------------------------------------
+
+
+async def port_check_url(url: URL, timeout: int = 5) -> bool:
+ """
+ Open the port designated by the URL given the timeout in seconds.
+
+ If the port is available then return True; False otherwise.
+
+ Parameters
+ ----------
+ url: The URL that provides the target system
+ timeout: Time to await for the port to open in seconds
+ """
+ port = url.port or socket.getservbyname(url.scheme)
+
+ try:
+ wr: asyncio.StreamWriter
+ _, wr = await asyncio.wait_for(asyncio.open_connection(host=url.host, port=port), timeout=timeout)
+
+ # MUST close if opened!
+ wr.close()
+
+ except TimeoutError:
+ return False
+ return True
diff --git a/asynceapi/config_session.py b/asynceapi/config_session.py
new file mode 100644
index 0000000..4054f14
--- /dev/null
+++ b/asynceapi/config_session.py
@@ -0,0 +1,289 @@
+# Copyright (c) 2024 Arista Networks, Inc.
+# Use of this source code is governed by the Apache License 2.0
+# that can be found in the LICENSE file.
+# Initially written by Jeremy Schulman at https://github.com/jeremyschulman/aio-eapi
+"""asynceapi.SessionConfig definition."""
+
+# -----------------------------------------------------------------------------
+# System Imports
+# -----------------------------------------------------------------------------
+from __future__ import annotations
+
+import re
+from typing import TYPE_CHECKING, Any
+
+if TYPE_CHECKING:
+ from .device import Device
+
+# -----------------------------------------------------------------------------
+# Exports
+# -----------------------------------------------------------------------------
+
+__all__ = ["SessionConfig"]
+
+# -----------------------------------------------------------------------------
+#
+# CODE BEGINS
+#
+# -----------------------------------------------------------------------------
+
+
+class SessionConfig:
+ """
+ Send configuration to a device using the EOS session mechanism.
+
+ This is the preferred way of managing configuration changes.
+
+ Notes
+ -----
+ This class definition is used by the parent Device class definition as
+ defined by `config_session`. A Caller can use the SessionConfig directly
+ as well, but it is not required.
+ """
+
+ CLI_CFG_FACTORY_RESET = "rollback clean-config"
+
+ def __init__(self, device: Device, name: str) -> None:
+ """
+ Create a new instance of SessionConfig.
+
+ The session config instance bound
+ to the given device instance, and using the session `name`.
+
+ Parameters
+ ----------
+ device: The associated device instance
+ name: The name of the config session
+ """
+ self._device = device
+ self._cli = device.cli
+ self._name = name
+ self._cli_config_session = f"configure session {self.name}"
+
+ # -------------------------------------------------------------------------
+ # properties for read-only attributes
+ # -------------------------------------------------------------------------
+
+ @property
+ def name(self) -> str:
+ """Return read-only session name attribute."""
+ return self._name
+
+ @property
+ def device(self) -> Device:
+ """Return read-only device instance attribute."""
+ return self._device
+
+ # -------------------------------------------------------------------------
+ # Public Methods
+ # -------------------------------------------------------------------------
+
+ async def status_all(self) -> dict[str, Any]:
+ """
+ Get the status of all the session config on the device.
+
+ Run the following command on the device:
+ # show configuration sessions detail
+
+ Returns
+ -------
+ Dict object of native EOS eAPI response; see `status` method for
+ details.
+
+ Examples
+ --------
+ {
+ "maxSavedSessions": 1,
+ "maxOpenSessions": 5,
+ "sessions": {
+ "jeremy1": {
+ "instances": {},
+ "state": "pending",
+ "commitUser": "",
+ "description": ""
+ },
+ "ansible_167510439362": {
+ "instances": {},
+ "state": "completed",
+ "commitUser": "joe.bob",
+ "description": "",
+ "completedTime": 1675104396.4500246
+ }
+ }
+ }
+ """
+ return await self._cli("show configuration sessions detail") # type: ignore[return-value] # json outformat returns dict[str, Any]
+
+ async def status(self) -> dict[str, Any] | None:
+ """
+ Get the status of a session config on the device.
+
+ Run the following command on the device:
+ # show configuration sessions detail
+
+ And return only the status dictionary for this session. If you want
+ all sessions, then use the `status_all` method.
+
+ Returns
+ -------
+ Dict instance of the session status. If the session does not exist,
+ then this method will return None.
+
+ The native eAPI results from JSON output, see example:
+
+ Examples
+ --------
+ all results:
+ {
+ "maxSavedSessions": 1,
+ "maxOpenSessions": 5,
+ "sessions": {
+ "jeremy1": {
+ "instances": {},
+ "state": "pending",
+ "commitUser": "",
+ "description": ""
+ },
+ "ansible_167510439362": {
+ "instances": {},
+ "state": "completed",
+ "commitUser": "joe.bob",
+ "description": "",
+ "completedTime": 1675104396.4500246
+ }
+ }
+ }
+
+ if the session name was 'jeremy1', then this method would return
+ {
+ "instances": {},
+ "state": "pending",
+ "commitUser": "",
+ "description": ""
+ }
+ """
+ res = await self.status_all()
+ return res["sessions"].get(self.name)
+
+ async def push(self, content: list[str] | str, *, replace: bool = False) -> None:
+ """
+ Send the configuration content to the device.
+
+ If `replace` is true, then the command "rollback clean-config" is issued
+ before sending the configuration content.
+
+ Parameters
+ ----------
+ content:
+ The text configuration CLI commands, as a list of strings, that
+ will be sent to the device. If the parameter is a string, and not
+ a list, then split the string across linebreaks. In either case
+ any empty lines will be discarded before they are send to the
+ device.
+ replace:
+ When True, the content will replace the existing configuration
+ on the device.
+ """
+ # if given s string, we need to break it up into individual command
+ # lines.
+
+ if isinstance(content, str):
+ content = content.splitlines()
+
+ # prepare the initial set of command to enter the config session and
+ # rollback clean if the `replace` argument is True.
+
+ commands: list[str | dict[str, Any]] = [self._cli_config_session]
+ if replace:
+ commands.append(self.CLI_CFG_FACTORY_RESET)
+
+ # add the Caller's commands, filtering out any blank lines. any command
+ # lines (!) are still included.
+
+ commands.extend(filter(None, content))
+
+ await self._cli(commands=commands)
+
+ async def commit(self, timer: str | None = None) -> None:
+ """
+ Commit the session config.
+
+ Run the following command on the device:
+ # configure session <name>
+ # commit
+
+ If the timer is specified, format is "hh:mm:ss", then a commit timer is
+ started. A second commit action must be made to confirm the config
+ session before the timer expires; otherwise the config-session is
+ automatically aborted.
+ """
+ command = f"{self._cli_config_session} commit"
+
+ if timer:
+ command += f" timer {timer}"
+
+ await self._cli(command)
+
+ async def abort(self) -> None:
+ """
+ Abort the configuration session.
+
+ Run the following command on the device:
+ # configure session <name> abort
+ """
+ await self._cli(f"{self._cli_config_session} abort")
+
+ async def diff(self) -> str:
+ """
+ Return the "diff" of the session config relative to the running config.
+
+ Run the following command on the device:
+ # show session-config named <name> diffs
+
+ Returns
+ -------
+ Return a string in diff-patch format.
+
+ References
+ ----------
+ * https://www.gnu.org/software/diffutils/manual/diffutils.txt
+ """
+ return await self._cli(f"show session-config named {self.name} diffs", ofmt="text") # type: ignore[return-value] # text outformat returns str
+
+ async def load_file(self, filename: str, *, replace: bool = False) -> None:
+ """
+ Load the configuration from <filename> into the session configuration.
+
+ If the replace parameter is True then the file contents will replace the existing session config (load-replace).
+
+ Parameters
+ ----------
+ filename:
+ The name of the configuration file. The caller is required to
+ specify the filesystem, for example, the
+ filename="flash:thisfile.cfg"
+
+ replace:
+ When True, the contents of the file will completely replace the
+ session config for a load-replace behavior.
+
+ Raises
+ ------
+ If there are any issues with loading the configuration file then a
+ RuntimeError is raised with the error messages content.
+ """
+ commands: list[str | dict[str, Any]] = [self._cli_config_session]
+ if replace:
+ commands.append(self.CLI_CFG_FACTORY_RESET)
+
+ commands.append(f"copy {filename} session-config")
+ res: list[dict[str, Any]] = await self._cli(commands=commands) # type: ignore[assignment] # JSON outformat of multiple commands returns list[dict[str, Any]]
+ checks_re = re.compile(r"error|abort|invalid", flags=re.I)
+ messages = res[-1]["messages"]
+
+ if any(map(checks_re.search, messages)):
+ raise RuntimeError("".join(messages))
+
+ async def write(self) -> None:
+ """Save the running config to the startup config by issuing the command "write" to the device."""
+ await self._cli("write")
diff --git a/asynceapi/device.py b/asynceapi/device.py
new file mode 100644
index 0000000..04ec3ab
--- /dev/null
+++ b/asynceapi/device.py
@@ -0,0 +1,291 @@
+# Copyright (c) 2024 Arista Networks, Inc.
+# Use of this source code is governed by the Apache License 2.0
+# that can be found in the LICENSE file.
+# Initially written by Jeremy Schulman at https://github.com/jeremyschulman/aio-eapi
+"""asynceapi.Device definition."""
+# -----------------------------------------------------------------------------
+# System Imports
+# -----------------------------------------------------------------------------
+
+from __future__ import annotations
+
+from socket import getservbyname
+from typing import TYPE_CHECKING, Any
+
+# -----------------------------------------------------------------------------
+# Public Imports
+# -----------------------------------------------------------------------------
+import httpx
+
+# -----------------------------------------------------------------------------
+# Private Imports
+# -----------------------------------------------------------------------------
+from .aio_portcheck import port_check_url
+from .config_session import SessionConfig
+from .errors import EapiCommandError
+
+if TYPE_CHECKING:
+ from collections.abc import Sequence
+
+# -----------------------------------------------------------------------------
+# Exports
+# -----------------------------------------------------------------------------
+
+
+__all__ = ["Device"]
+
+
+# -----------------------------------------------------------------------------
+#
+# CODE BEGINS
+#
+# -----------------------------------------------------------------------------
+
+
+class Device(httpx.AsyncClient):
+ """
+ Represent the async JSON-RPC client that communicates with an Arista EOS device.
+
+ This class inherits directly from the
+ httpx.AsyncClient, so any initialization options can be passed directly.
+ """
+
+ auth = None
+ EAPI_OFMT_OPTIONS = ("json", "text")
+ EAPI_DEFAULT_OFMT = "json"
+
+ def __init__( # noqa: PLR0913 # pylint: disable=too-many-arguments
+ self,
+ host: str | None = None,
+ username: str | None = None,
+ password: str | None = None,
+ proto: str = "https",
+ port: str | int | None = None,
+ **kwargs: Any, # noqa: ANN401
+ ) -> None:
+ """
+ Initialize the Device class.
+
+ As a subclass to httpx.AsyncClient, the caller can provide any of those initializers.
+ Specific parameters for Device class are all optional and described below.
+
+ Parameters
+ ----------
+ host: The EOS target device, either hostname (DNS) or ipaddress.
+ username: The login user-name; requires the password parameter.
+ password: The login password; requires the username parameter.
+ proto: The protocol, http or https, to communicate eAPI with the device.
+ port: If not provided, the proto value is used to look up the associated
+ port (http=80, https=443). If provided, overrides the port used to
+ communite with the device.
+
+ Other Parameters
+ ----------------
+ base_url: str
+ If provided, the complete URL to the device eAPI endpoint.
+
+ auth:
+ If provided, used as the httpx authorization initializer value. If
+ not provided, then username+password is assumed by the Caller and
+ used to create a BasicAuth instance.
+ """
+ self.port = port or getservbyname(proto)
+ self.host = host
+ kwargs.setdefault("base_url", httpx.URL(f"{proto}://{self.host}:{self.port}"))
+ kwargs.setdefault("verify", False)
+
+ if username and password:
+ self.auth = httpx.BasicAuth(username, password)
+
+ kwargs.setdefault("auth", self.auth)
+
+ super().__init__(**kwargs)
+ self.headers["Content-Type"] = "application/json-rpc"
+
+ async def check_connection(self) -> bool:
+ """
+ Check the target device to ensure that the eAPI port is open and accepting connections.
+
+ It is recommended that a Caller checks the connection before involving cli commands,
+ but this step is not required.
+
+ Returns
+ -------
+ True when the device eAPI is accessible, False otherwise.
+ """
+ return await port_check_url(self.base_url)
+
+ async def cli( # noqa: PLR0913 # pylint: disable=too-many-arguments
+ self,
+ command: str | dict[str, Any] | None = None,
+ commands: Sequence[str | dict[str, Any]] | None = None,
+ ofmt: str | None = None,
+ version: int | str | None = "latest",
+ *,
+ suppress_error: bool = False,
+ auto_complete: bool = False,
+ expand_aliases: bool = False,
+ req_id: int | str | None = None,
+ ) -> list[dict[str, Any] | str] | dict[str, Any] | str | None:
+ """
+ Execute one or more CLI commands.
+
+ Parameters
+ ----------
+ command:
+ A single command to execute; results in a single output response
+ commands:
+ A list of commands to execute; results in a list of output responses
+ ofmt:
+ Either 'json' or 'text'; indicates the output format for the CLI commands.
+ version:
+ By default the eAPI will use "version 1" for all API object models.
+ This driver will, by default, always set version to "latest" so
+ that the behavior matches the CLI of the device. The caller can
+ override the "latest" behavior by explicitly setting the version.
+ suppress_error:
+ When not False, then if the execution of the command would-have
+ raised an EapiCommandError, rather than raising this exception this
+ routine will return the value None.
+
+ For example, if the following command had raised
+ EapiCommandError, now response would be set to None instead.
+
+ response = dev.cli(..., suppress_error=True)
+ auto_complete:
+ Enabled/disables the command auto-compelete feature of the EAPI. Per the
+ documentation:
+ Allows users to use shorthand commands in eAPI calls. With this
+ parameter included a user can send 'sh ver' via eAPI to get the
+ output of 'show version'.
+ expand_aliases:
+ Enables/disables the command use of User defined alias. Per the
+ documentation:
+ Allowed users to provide the expandAliases parameter to eAPI
+ calls. This allows users to use aliased commands via the API.
+ For example if an alias is configured as 'sv' for 'show version'
+ then an API call with sv and the expandAliases parameter will
+ return the output of show version.
+ req_id:
+ A unique identifier that will be echoed back by the switch. May be a string or number.
+
+ Returns
+ -------
+ One or List of output responses, per the description above.
+ """
+ if not any((command, commands)):
+ msg = "Required 'command' or 'commands'"
+ raise RuntimeError(msg)
+
+ jsonrpc = self._jsonrpc_command(
+ commands=[command] if command else commands, ofmt=ofmt, version=version, auto_complete=auto_complete, expand_aliases=expand_aliases, req_id=req_id
+ )
+
+ try:
+ res = await self.jsonrpc_exec(jsonrpc)
+ return res[0] if command else res
+ except EapiCommandError:
+ if suppress_error:
+ return None
+ raise
+
+ def _jsonrpc_command( # noqa: PLR0913 # pylint: disable=too-many-arguments
+ self,
+ commands: Sequence[str | dict[str, Any]] | None = None,
+ ofmt: str | None = None,
+ version: int | str | None = "latest",
+ *,
+ auto_complete: bool = False,
+ expand_aliases: bool = False,
+ req_id: int | str | None = None,
+ ) -> dict[str, Any]:
+ """Create the JSON-RPC command dictionary object."""
+ cmd: dict[str, Any] = {
+ "jsonrpc": "2.0",
+ "method": "runCmds",
+ "params": {
+ "version": version,
+ "cmds": commands,
+ "format": ofmt or self.EAPI_DEFAULT_OFMT,
+ },
+ "id": req_id or id(self),
+ }
+ if auto_complete is not None:
+ cmd["params"].update({"autoComplete": auto_complete})
+
+ if expand_aliases is not None:
+ cmd["params"].update({"expandAliases": expand_aliases})
+
+ return cmd
+
+ async def jsonrpc_exec(self, jsonrpc: dict[str, Any]) -> list[dict[str, Any] | str]:
+ """
+ Execute the JSON-RPC dictionary object.
+
+ Parameters
+ ----------
+ jsonrpc:
+ The JSON-RPC as created by the `meth`:_jsonrpc_command().
+
+ Raises
+ ------
+ EapiCommandError
+ In the event that a command resulted in an error response.
+
+ Returns
+ -------
+ The list of command results; either dict or text depending on the
+ JSON-RPC format parameter.
+ """
+ res = await self.post("/command-api", json=jsonrpc)
+ res.raise_for_status()
+ body = res.json()
+
+ commands = jsonrpc["params"]["cmds"]
+ ofmt = jsonrpc["params"]["format"]
+
+ get_output = (lambda _r: _r["output"]) if ofmt == "text" else (lambda _r: _r)
+
+ # if there are no errors then return the list of command results.
+ if (err_data := body.get("error")) is None:
+ return [get_output(cmd_res) for cmd_res in body["result"]]
+
+ # ---------------------------------------------------------------------
+ # if we are here, then there were some command errors. Raise a
+ # EapiCommandError exception with args (commands that failed, passed,
+ # not-executed).
+ # ---------------------------------------------------------------------
+
+ # -------------------------- eAPI specification ----------------------
+ # On an error, no result object is present, only an error object, which
+ # is guaranteed to have the following attributes: code, messages, and
+ # data. Similar to the result object in the successful response, the
+ # data object is a list of objects corresponding to the results of all
+ # commands up to, and including, the failed command. If there was a an
+ # error before any commands were executed (e.g. bad credentials), data
+ # will be empty. The last object in the data array will always
+ # correspond to the failed command. The command failure details are
+ # always stored in the errors array.
+
+ cmd_data = err_data["data"]
+ len_data = len(cmd_data)
+ err_at = len_data - 1
+ err_msg = err_data["message"]
+
+ raise EapiCommandError(
+ passed=[get_output(cmd_data[cmd_i]) for cmd_i, cmd in enumerate(commands[:err_at])],
+ failed=commands[err_at]["cmd"],
+ errors=cmd_data[err_at]["errors"],
+ errmsg=err_msg,
+ not_exec=commands[err_at + 1 :],
+ )
+
+ def config_session(self, name: str) -> SessionConfig:
+ """
+ return a SessionConfig instance bound to this device with the given session name.
+
+ Parameters
+ ----------
+ name: The config-session name
+ """
+ return SessionConfig(self, name)
diff --git a/asynceapi/errors.py b/asynceapi/errors.py
new file mode 100644
index 0000000..614427a
--- /dev/null
+++ b/asynceapi/errors.py
@@ -0,0 +1,42 @@
+# Copyright (c) 2024 Arista Networks, Inc.
+# Use of this source code is governed by the Apache License 2.0
+# that can be found in the LICENSE file.
+# Initially written by Jeremy Schulman at https://github.com/jeremyschulman/aio-eapi
+"""asynceapi module exceptions."""
+
+from __future__ import annotations
+
+from typing import Any
+
+import httpx
+
+
+class EapiCommandError(RuntimeError):
+ """
+ Exception class for EAPI command errors.
+
+ Attributes
+ ----------
+ failed: the failed command
+ errmsg: a description of the failure reason
+ errors: the command failure details
+ passed: a list of command results of the commands that passed
+ not_exec: a list of commands that were not executed
+ """
+
+ def __init__(self, failed: str, errors: list[str], errmsg: str, passed: list[str | dict[str, Any]], not_exec: list[dict[str, Any]]) -> None: # noqa: PLR0913 # pylint: disable=too-many-arguments
+ """Initialize for the EapiCommandError exception."""
+ self.failed = failed
+ self.errmsg = errmsg
+ self.errors = errors
+ self.passed = passed
+ self.not_exec = not_exec
+ super().__init__()
+
+ def __str__(self) -> str:
+ """Return the error message associated with the exception."""
+ return self.errmsg
+
+
+# alias for exception during sending-receiving
+EapiTransportError = httpx.HTTPStatusError