summaryrefslogtreecommitdiffstats
path: root/util.py
blob: 17cb2d6f5ab8535c18af362a65ca72a083536c25 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
from __future__ import annotations

"""
some helper functions that might be generally useful
"""

import datetime
from functools import partial
import re


if False:  # MYPY
    from typing import Any, Dict, Optional, List, Text, Callable, Union  # NOQA
    from .compat import StreamTextType  # NOQA


class LazyEval:
    """
    Lightweight wrapper around lazily evaluated func(*args, **kwargs).

    func is only evaluated when any attribute of its return value is accessed.
    Every attribute access is passed through to the wrapped value.
    (This only excludes special cases like method-wrappers, e.g., __hash__.)
    The sole additional attribute is the lazy_self function which holds the
    return value (or, prior to evaluation, func and arguments), in its closure.
    """

    def __init__(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
        def lazy_self() -> Any:
            return_value = func(*args, **kwargs)
            object.__setattr__(self, 'lazy_self', lambda: return_value)
            return return_value

        object.__setattr__(self, 'lazy_self', lazy_self)

    def __getattribute__(self, name: str) -> Any:
        lazy_self = object.__getattribute__(self, 'lazy_self')
        if name == 'lazy_self':
            return lazy_self
        return getattr(lazy_self(), name)

    def __setattr__(self, name: str, value: Any) -> None:
        setattr(self.lazy_self(), name, value)


RegExp = partial(LazyEval, re.compile)

timestamp_regexp = RegExp(
    """^(?P<year>[0-9][0-9][0-9][0-9])
       -(?P<month>[0-9][0-9]?)
       -(?P<day>[0-9][0-9]?)
       (?:((?P<t>[Tt])|[ \\t]+)   # explictly not retaining extra spaces
       (?P<hour>[0-9][0-9]?)
       :(?P<minute>[0-9][0-9])
       :(?P<second>[0-9][0-9])
       (?:\\.(?P<fraction>[0-9]*))?
        (?:[ \\t]*(?P<tz>Z|(?P<tz_sign>[-+])(?P<tz_hour>[0-9][0-9]?)
       (?::(?P<tz_minute>[0-9][0-9]))?))?)?$""",
    re.X,
)


def create_timestamp(
    year: Any,
    month: Any,
    day: Any,
    t: Any,
    hour: Any,
    minute: Any,
    second: Any,
    fraction: Any,
    tz: Any,
    tz_sign: Any,
    tz_hour: Any,
    tz_minute: Any,
) -> Union[datetime.datetime, datetime.date]:
    # create a timestamp from matching against timestamp_regexp
    MAX_FRAC = 999999
    year = int(year)
    month = int(month)
    day = int(day)
    if hour is None:
        return datetime.date(year, month, day)
    hour = int(hour)
    minute = int(minute)
    second = int(second)
    frac = 0
    if fraction:
        frac_s = fraction[:6]
        while len(frac_s) < 6:
            frac_s += '0'
        frac = int(frac_s)
        if len(fraction) > 6 and int(fraction[6]) > 4:
            frac += 1
        if frac > MAX_FRAC:
            fraction = 0
        else:
            fraction = frac
    else:
        fraction = 0
    tzinfo = None
    delta = None
    if tz_sign:
        tz_hour = int(tz_hour)
        tz_minute = int(tz_minute) if tz_minute else 0
        td = datetime.timedelta(
            hours=tz_hour, minutes=tz_minute,
        )
        if tz_sign == '-':
            td = -td
        tzinfo = datetime.timezone(td, name=tz)
    elif tz == 'Z':
        tzinfo = datetime.timezone(datetime.timedelta(hours=0), name=tz)
    if frac > MAX_FRAC:
        delta = -datetime.timedelta(seconds=1)
    # should do something else instead (or hook this up to the preceding if statement
    # in reverse
    #  if delta is None:
    #      return datetime.datetime(year, month, day, hour, minute, second, fraction)
    #  return datetime.datetime(year, month, day, hour, minute, second, fraction,
    #                           datetime.timezone.utc)
    # the above is not good enough though, should provide tzinfo. In Python3 that is easily
    # doable drop that kind of support for Python2 as it has not native tzinfo
    data = datetime.datetime(year, month, day, hour, minute, second, fraction, tzinfo)
    if delta:
        data -= delta
    return data


# originally as comment
# https://github.com/pre-commit/pre-commit/pull/211#issuecomment-186466605
# if you use this in your code, I suggest adding a test in your test suite
# that check this routines output against a known piece of your YAML
# before upgrades to this code break your round-tripped YAML
def load_yaml_guess_indent(stream: StreamTextType, **kw: Any) -> Any:
    """guess the indent and block sequence indent of yaml stream/string

    returns round_trip_loaded stream, indent level, block sequence indent
    - block sequence indent is the number of spaces before a dash relative to previous indent
    - if there are no block sequences, indent is taken from nested mappings, block sequence
      indent is unset (None) in that case
    """
    from .main import YAML

    # load a YAML document, guess the indentation, if you use TABs you are on your own
    def leading_spaces(line: Any) -> int:
        idx = 0
        while idx < len(line) and line[idx] == ' ':
            idx += 1
        return idx

    if isinstance(stream, str):
        yaml_str: Any = stream
    elif isinstance(stream, bytes):
        # most likely, but the Reader checks BOM for this
        yaml_str = stream.decode('utf-8')
    else:
        yaml_str = stream.read()
    map_indent = None
    indent = None  # default if not found for some reason
    block_seq_indent = None
    prev_line_key_only = None
    key_indent = 0
    for line in yaml_str.splitlines():
        rline = line.rstrip()
        lline = rline.lstrip()
        if lline.startswith('- '):
            l_s = leading_spaces(line)
            block_seq_indent = l_s - key_indent
            idx = l_s + 1
            while line[idx] == ' ':  # this will end as we rstripped
                idx += 1
            if line[idx] == '#':  # comment after -
                continue
            indent = idx - key_indent
            break
        if map_indent is None and prev_line_key_only is not None and rline:
            idx = 0
            while line[idx] in ' -':
                idx += 1
            if idx > prev_line_key_only:
                map_indent = idx - prev_line_key_only
        if rline.endswith(':'):
            key_indent = leading_spaces(line)
            idx = 0
            while line[idx] == ' ':  # this will end on ':'
                idx += 1
            prev_line_key_only = idx
            continue
        prev_line_key_only = None
    if indent is None and map_indent is not None:
        indent = map_indent
    yaml = YAML()
    return yaml.load(yaml_str, **kw), indent, block_seq_indent


def configobj_walker(cfg: Any) -> Any:
    """
    walks over a ConfigObj (INI file with comments) generating
    corresponding YAML output (including comments
    """
    from configobj import ConfigObj  # type: ignore

    assert isinstance(cfg, ConfigObj)
    for c in cfg.initial_comment:
        if c.strip():
            yield c
    for s in _walk_section(cfg):
        if s.strip():
            yield s
    for c in cfg.final_comment:
        if c.strip():
            yield c


def _walk_section(s: Any, level: int = 0) -> Any:
    from configobj import Section

    assert isinstance(s, Section)
    indent = '  ' * level
    for name in s.scalars:
        for c in s.comments[name]:
            yield indent + c.strip()
        x = s[name]
        if '\n' in x:
            i = indent + '  '
            x = '|\n' + i + x.strip().replace('\n', '\n' + i)
        elif ':' in x:
            x = "'" + x.replace("'", "''") + "'"
        line = f'{indent}{name}: {x}'
        c = s.inline_comments[name]
        if c:
            line += ' ' + c
        yield line
    for name in s.sections:
        for c in s.comments[name]:
            yield indent + c.strip()
        line = f'{indent}{name}:'
        c = s.inline_comments[name]
        if c:
            line += ' ' + c
        yield line
        for val in _walk_section(s[name], level=level + 1):
            yield val


# def config_obj_2_rt_yaml(cfg):
#     from .comments import CommentedMap, CommentedSeq
#     from configobj import ConfigObj
#     assert isinstance(cfg, ConfigObj)
#     #for c in cfg.initial_comment:
#     #    if c.strip():
#     #        pass
#     cm = CommentedMap()
#     for name in s.sections:
#         cm[name] = d = CommentedMap()
#
#
#     #for c in cfg.final_comment:
#     #    if c.strip():
#     #        yield c
#     return cm