summaryrefslogtreecommitdiffstats
path: root/tests/topotests/munet/config.py
blob: 2870ae615c137b79a61ce7314801247a73c8445b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# -*- coding: utf-8 eval: (blacken-mode 1) -*-
# SPDX-License-Identifier: GPL-2.0-or-later
#
# June 25 2022, Christian Hopps <chopps@gmail.com>
#
# Copyright (c) 2021-2022, LabN Consulting, L.L.C.
#
"""A module that defines common configuration utility functions."""
import logging

from collections.abc import Iterable
from copy import deepcopy
from typing import overload


def find_with_kv(lst, k, v):
    if lst:
        for e in lst:
            if k in e and e[k] == v:
                return e
    return {}


def find_all_with_kv(lst, k, v):
    rv = []
    if lst:
        for e in lst:
            if k in e and e[k] == v:
                rv.append(e)
    return rv


def find_matching_net_config(name, cconf, oconf):
    p = find_all_with_kv(oconf.get("connections", {}), "to", name)
    if not p:
        return {}

    rname = cconf.get("remote-name", None)
    if not rname:
        return p[0]

    return find_with_kv(p, "name", rname)


def merge_using_key(a, b, k):
    # First get a dict of indexes in `a` for the key value of `k` in objects of `a`
    m = list(a)
    mi = {o[k]: i for i, o in enumerate(m)}
    for o in b:
        bkv = o[k]
        if bkv in mi:
            m[mi[bkv]] = o
        else:
            mi[bkv] = len(m)
            m.append(o)
    return m


def list_to_dict_with_key(lst, k):
    """Convert a YANG styl list of objects to dict of objects.

    This function converts a YANG style list of objects (dictionaries) to a plain python
    dictionary of objects (dictionaries).  The value for the supplied key for each
    object is used to store the object in the new diciontary.

    This only works for lists of objects which are keyed on a single contained value.

    Args:
      lst: a *list* of python dictionary objects.
      k: the key value contained in each dictionary object in the list.

    Returns:
      A dictionary of objects (dictionaries).
    """
    return {x[k]: x for x in (lst if lst else [])}


def config_to_dict_with_key(c, ck, k):
    """Convert the config item from a list of objects to dict.

    Use :py:func:`list_to_dict_with_key` to convert the list of objects
    at ``c[ck]`` to a dict of the objects using the key ``k``.

    Args:
      c: config dictionary
      ck: The key identifying the list of objects from ``c``.
      k: The key to pass to :py:func:`list_to_dict_with_key`.

    Returns:
      A dictionary of objects (dictionaries).
    """
    c[ck] = list_to_dict_with_key(c.get(ck, []), k)
    return c[ck]


@overload
def config_subst(config: str, **kwargs) -> str:
    ...


@overload
def config_subst(config: Iterable, **kwargs) -> Iterable:
    ...


def config_subst(config: Iterable, **kwargs) -> Iterable:
    if isinstance(config, str):
        if "%RUNDIR%/%NAME%" in config:
            config = config.replace("%RUNDIR%/%NAME%", "%RUNDIR%")
            logging.warning(
                "config '%RUNDIR%/%NAME%' should be changed to '%RUNDIR%' only, "
                "converting automatically for now."
            )
        for name, value in kwargs.items():
            config = config.replace(f"%{name.upper()}%", str(value))
    elif isinstance(config, Iterable):
        try:
            return {k: config_subst(config[k], **kwargs) for k in config}
        except (KeyError, TypeError):
            return [config_subst(x, **kwargs) for x in config]
    return config


def value_merge_deepcopy(s1, s2):
    """Merge values using deepcopy.

    Create a deepcopy of the result of merging the values from dicts ``s1`` and ``s2``.
    If a key exists in both ``s1`` and ``s2`` the value from ``s2`` is used."
    """
    d = {}
    for k, v in s1.items():
        if k in s2:
            d[k] = deepcopy(s2[k])
        else:
            d[k] = deepcopy(v)
    return d


def merge_kind_config(kconf, config):
    mergekeys = kconf.get("merge", [])
    config = deepcopy(config)
    new = deepcopy(kconf)
    for k in new:
        if k not in config:
            continue

        if k not in mergekeys:
            new[k] = config[k]
        elif isinstance(new[k], list):
            new[k].extend(config[k])
        elif isinstance(new[k], dict):
            new[k] = {**new[k], **config[k]}
        else:
            new[k] = config[k]
    for k in config:
        if k not in new:
            new[k] = config[k]
    return new


def cli_opt_list(option_list):
    if not option_list:
        return []
    if isinstance(option_list, str):
        return [x for x in option_list.split(",") if x]
    return [x for x in option_list if x]


def name_in_cli_opt_str(name, option_list):
    ol = cli_opt_list(option_list)
    return name in ol or "all" in ol


class ConfigOptionsProxy:
    """Proxy options object to fill in for any missing pytest config."""

    class DefNoneObject:
        """An object that returns None for any attribute access."""

        def __getattr__(self, attr):
            return None

    def __init__(self, pytestconfig=None):
        if isinstance(pytestconfig, ConfigOptionsProxy):
            self.config = pytestconfig.config
            self.option = self.config.option
        else:
            self.config = pytestconfig
        if self.config:
            self.option = self.config.option
        else:
            self.option = ConfigOptionsProxy.DefNoneObject()

    def getoption(self, opt, default=None):
        if not self.config:
            return default

        try:
            value = self.config.getoption(opt)
            return value if value is not None else default
        except ValueError:
            return default

    def get_option(self, opt, default=None):
        return self.getoption(opt, default)

    def get_option_list(self, opt):
        value = self.get_option(opt, "")
        return cli_opt_list(value)

    def name_in_option_list(self, name, opt):
        optlist = self.get_option_list(opt)
        return "all" in optlist or name in optlist