summaryrefslogtreecommitdiffstats
path: root/anta/custom_types.py
diff options
context:
space:
mode:
Diffstat (limited to 'anta/custom_types.py')
-rw-r--r--anta/custom_types.py68
1 files changed, 53 insertions, 15 deletions
diff --git a/anta/custom_types.py b/anta/custom_types.py
index 5de7a61..a0a0631 100644
--- a/anta/custom_types.py
+++ b/anta/custom_types.py
@@ -9,6 +9,31 @@ from typing import Annotated, Literal
from pydantic import Field
from pydantic.functional_validators import AfterValidator, BeforeValidator
+# Regular Expression definition
+# TODO: make this configurable - with an env var maybe?
+REGEXP_EOS_BLACKLIST_CMDS = [r"^reload.*", r"^conf\w*\s*(terminal|session)*", r"^wr\w*\s*\w+"]
+"""List of regular expressions to blacklist from eos commands."""
+REGEXP_PATH_MARKERS = r"[\\\/\s]"
+"""Match directory path from string."""
+REGEXP_INTERFACE_ID = r"\d+(\/\d+)*(\.\d+)?"
+"""Match Interface ID lilke 1/1.1."""
+REGEXP_TYPE_EOS_INTERFACE = r"^(Dps|Ethernet|Fabric|Loopback|Management|Port-Channel|Tunnel|Vlan|Vxlan)[0-9]+(\/[0-9]+)*(\.[0-9]+)?$"
+"""Match EOS interface types like Ethernet1/1, Vlan1, Loopback1, etc."""
+REGEXP_TYPE_VXLAN_SRC_INTERFACE = r"^(Loopback)([0-9]|[1-9][0-9]{1,2}|[1-7][0-9]{3}|8[01][0-9]{2}|819[01])$"
+"""Match Vxlan source interface like Loopback10."""
+REGEXP_TYPE_HOSTNAME = r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$"
+"""Match hostname like `my-hostname`, `my-hostname-1`, `my-hostname-1-2`."""
+
+# Regexp BGP AFI/SAFI
+REGEXP_BGP_L2VPN_AFI = r"\b(l2[\s\-]?vpn[\s\-]?evpn)\b"
+"""Match L2VPN EVPN AFI."""
+REGEXP_BGP_IPV4_MPLS_LABELS = r"\b(ipv4[\s\-]?mpls[\s\-]?label(s)?)\b"
+"""Match IPv4 MPLS Labels."""
+REGEX_BGP_IPV4_MPLS_VPN = r"\b(ipv4[\s\-]?mpls[\s\-]?vpn)\b"
+"""Match IPv4 MPLS VPN."""
+REGEX_BGP_IPV4_UNICAST = r"\b(ipv4[\s\-]?uni[\s\-]?cast)\b"
+"""Match IPv4 Unicast."""
+
def aaa_group_prefix(v: str) -> str:
"""Prefix the AAA method with 'group' if it is known."""
@@ -24,7 +49,7 @@ def interface_autocomplete(v: str) -> str:
- `po` will be changed to `Port-Channel`
- `lo` will be changed to `Loopback`
"""
- intf_id_re = re.compile(r"[0-9]+(\/[0-9]+)*(\.[0-9]+)?")
+ intf_id_re = re.compile(REGEXP_INTERFACE_ID)
m = intf_id_re.search(v)
if m is None:
msg = f"Could not parse interface ID in interface '{v}'"
@@ -33,11 +58,7 @@ def interface_autocomplete(v: str) -> str:
alias_map = {"et": "Ethernet", "eth": "Ethernet", "po": "Port-Channel", "lo": "Loopback"}
- for alias, full_name in alias_map.items():
- if v.lower().startswith(alias):
- return f"{full_name}{intf_id}"
-
- return v
+ return next((f"{full_name}{intf_id}" for alias, full_name in alias_map.items() if v.lower().startswith(alias)), v)
def interface_case_sensitivity(v: str) -> str:
@@ -50,7 +71,7 @@ def interface_case_sensitivity(v: str) -> str:
- loopback -> Loopback
"""
- if isinstance(v, str) and len(v) > 0 and not v[0].isupper():
+ if isinstance(v, str) and v != "" and not v[0].isupper():
return f"{v[0].upper()}{v[1:]}"
return v
@@ -67,10 +88,10 @@ def bgp_multiprotocol_capabilities_abbreviations(value: str) -> str:
"""
patterns = {
- r"\b(l2[\s\-]?vpn[\s\-]?evpn)\b": "l2VpnEvpn",
- r"\bipv4[\s_-]?mpls[\s_-]?label(s)?\b": "ipv4MplsLabels",
- r"\bipv4[\s_-]?mpls[\s_-]?vpn\b": "ipv4MplsVpn",
- r"\bipv4[\s_-]?uni[\s_-]?cast\b": "ipv4Unicast",
+ REGEXP_BGP_L2VPN_AFI: "l2VpnEvpn",
+ REGEXP_BGP_IPV4_MPLS_LABELS: "ipv4MplsLabels",
+ REGEX_BGP_IPV4_MPLS_VPN: "ipv4MplsVpn",
+ REGEX_BGP_IPV4_UNICAST: "ipv4Unicast",
}
for pattern, replacement in patterns.items():
@@ -81,6 +102,16 @@ def bgp_multiprotocol_capabilities_abbreviations(value: str) -> str:
return value
+def validate_regex(value: str) -> str:
+ """Validate that the input value is a valid regex format."""
+ try:
+ re.compile(value)
+ except re.error as e:
+ msg = f"Invalid regex: {e}"
+ raise ValueError(msg) from e
+ return value
+
+
# ANTA framework
TestStatus = Literal["unset", "success", "failure", "error", "skipped"]
@@ -91,13 +122,19 @@ MlagPriority = Annotated[int, Field(ge=1, le=32767)]
Vni = Annotated[int, Field(ge=1, le=16777215)]
Interface = Annotated[
str,
- Field(pattern=r"^(Dps|Ethernet|Fabric|Loopback|Management|Port-Channel|Tunnel|Vlan|Vxlan)[0-9]+(\/[0-9]+)*(\.[0-9]+)?$"),
+ Field(pattern=REGEXP_TYPE_EOS_INTERFACE),
+ BeforeValidator(interface_autocomplete),
+ BeforeValidator(interface_case_sensitivity),
+]
+EthernetInterface = Annotated[
+ str,
+ Field(pattern=r"^Ethernet[0-9]+(\/[0-9]+)*$"),
BeforeValidator(interface_autocomplete),
BeforeValidator(interface_case_sensitivity),
]
VxlanSrcIntf = Annotated[
str,
- Field(pattern=r"^(Loopback)([0-9]|[1-9][0-9]{1,2}|[1-7][0-9]{3}|8[01][0-9]{2}|819[01])$"),
+ Field(pattern=REGEXP_TYPE_VXLAN_SRC_INTERFACE),
BeforeValidator(interface_autocomplete),
BeforeValidator(interface_case_sensitivity),
]
@@ -105,7 +142,7 @@ Afi = Literal["ipv4", "ipv6", "vpn-ipv4", "vpn-ipv6", "evpn", "rt-membership", "
Safi = Literal["unicast", "multicast", "labeled-unicast", "sr-te"]
EncryptionAlgorithm = Literal["RSA", "ECDSA"]
RsaKeySize = Literal[2048, 3072, 4096]
-EcdsaKeySize = Literal[256, 384, 521]
+EcdsaKeySize = Literal[256, 384, 512]
MultiProtocolCaps = Annotated[str, BeforeValidator(bgp_multiprotocol_capabilities_abbreviations)]
BfdInterval = Annotated[int, Field(ge=50, le=60000)]
BfdMultiplier = Annotated[int, Field(ge=3, le=50)]
@@ -127,5 +164,6 @@ ErrDisableInterval = Annotated[int, Field(ge=30, le=86400)]
Percent = Annotated[float, Field(ge=0.0, le=100.0)]
PositiveInteger = Annotated[int, Field(ge=0)]
Revision = Annotated[int, Field(ge=1, le=99)]
-Hostname = Annotated[str, Field(pattern=r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$")]
+Hostname = Annotated[str, Field(pattern=REGEXP_TYPE_HOSTNAME)]
Port = Annotated[int, Field(ge=1, le=65535)]
+RegexString = Annotated[str, AfterValidator(validate_regex)]