diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-14 20:19:53 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-14 20:19:53 +0000 |
commit | e7ee850d46d54789979bf0c5244bae1825fb7149 (patch) | |
tree | 6e94ed55df9ec749682a3c792ce752d07892b968 /lib/ruyaml/util.py | |
parent | Initial commit. (diff) | |
download | python-ruyaml-e7ee850d46d54789979bf0c5244bae1825fb7149.tar.xz python-ruyaml-e7ee850d46d54789979bf0c5244bae1825fb7149.zip |
Adding upstream version 0.91.0.upstream/0.91.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | lib/ruyaml/util.py | 247 |
1 files changed, 247 insertions, 0 deletions
diff --git a/lib/ruyaml/util.py b/lib/ruyaml/util.py new file mode 100644 index 0000000..60e5293 --- /dev/null +++ b/lib/ruyaml/util.py @@ -0,0 +1,247 @@ +# coding: utf-8 + +""" +some helper functions that might be generally useful +""" + +import datetime +import re +from functools import partial +from typing import Any + +if False: # MYPY + from typing import Any, Dict, List, Optional, Text # NOQA + + from .compat import StreamTextType # NOQA + + +class LazyEval: + """ + Lightweight wrapper around lazily evaluated func(*args, **kwargs). + + func is only evaluated when any attribute of its return value is accessed. + Every attribute access is passed through to the wrapped value. + (This only excludes special cases like method-wrappers, e.g., __hash__.) + The sole additional attribute is the lazy_self function which holds the + return value (or, prior to evaluation, func and arguments), in its closure. + """ + + def __init__(self, func, *args, **kwargs): + # type: (Any, Any, Any) -> None + def lazy_self(): + # type: () -> Any + return_value = func(*args, **kwargs) + object.__setattr__(self, 'lazy_self', lambda: return_value) + return return_value + + object.__setattr__(self, 'lazy_self', lazy_self) + + def __getattribute__(self, name): + # type: (Any) -> Any + lazy_self = object.__getattribute__(self, 'lazy_self') + if name == 'lazy_self': + return lazy_self + return getattr(lazy_self(), name) + + def __setattr__(self, name, value): + # type: (Any, Any) -> None + setattr(self.lazy_self(), name, value) + + +RegExp = partial(LazyEval, re.compile) + +timestamp_regexp = RegExp( + """^(?P<year>[0-9][0-9][0-9][0-9]) + -(?P<month>[0-9][0-9]?) + -(?P<day>[0-9][0-9]?) + (?:((?P<t>[Tt])|[ \\t]+) # explictly not retaining extra spaces + (?P<hour>[0-9][0-9]?) + :(?P<minute>[0-9][0-9]) + :(?P<second>[0-9][0-9]) + (?:\\.(?P<fraction>[0-9]*))? + (?:[ \\t]*(?P<tz>Z|(?P<tz_sign>[-+])(?P<tz_hour>[0-9][0-9]?) + (?::(?P<tz_minute>[0-9][0-9]))?))?)?$""", + re.X, +) + + +def create_timestamp( + year, month, day, t, hour, minute, second, fraction, tz, tz_sign, tz_hour, tz_minute +): + year = int(year) + month = int(month) + day = int(day) + if not hour: + return datetime.date(year, month, day) + hour = int(hour) + minute = int(minute) + second = int(second) + if fraction: + frac = 0 + frac_s = fraction[:6] + while len(frac_s) < 6: + frac_s += '0' + frac = int(frac_s) + if len(fraction) > 6 and int(fraction[6]) > 4: + frac += 1 + fraction = frac + else: + fraction = 0 + delta = None + if tz_sign: + tz_hour = int(tz_hour) + tz_minute = int(tz_minute) if tz_minute else 0 + delta = datetime.timedelta(hours=tz_hour, minutes=tz_minute) + if tz_sign == '-': + delta = -delta + # should do something else instead (or hook this up to the preceding if statement + # in reverse + # if delta is None: + # return datetime.datetime(year, month, day, hour, minute, second, fraction) + # return datetime.datetime(year, month, day, hour, minute, second, fraction, + # datetime.timezone.utc) + # the above is not good enough though, should provide tzinfo. In Python3 that is easily + # doable drop that kind of support for Python2 as it has not native tzinfo + data = datetime.datetime(year, month, day, hour, minute, second, fraction) + if delta: + data -= delta + return data + + +# originally as comment +# https://github.com/pre-commit/pre-commit/pull/211#issuecomment-186466605 +# if you use this in your code, I suggest adding a test in your test suite +# that check this routines output against a known piece of your YAML +# before upgrades to this code break your round-tripped YAML +def load_yaml_guess_indent(stream): + # type: (StreamTextType) -> Any + """guess the indent and block sequence indent of yaml stream/string + + returns round_trip_loaded stream, indent level, block sequence indent + - block sequence indent is the number of spaces before a dash relative to previous indent + - if there are no block sequences, indent is taken from nested mappings, block sequence + indent is unset (None) in that case + """ + from .main import YAML + + # load a YAML document, guess the indentation, if you use TABs you are on your own + def leading_spaces(line): + # type: (Any) -> int + idx = 0 + while idx < len(line) and line[idx] == ' ': + idx += 1 + return idx + + if isinstance(stream, str): + yaml_str = stream # type: Any + elif isinstance(stream, bytes): + # most likely, but the Reader checks BOM for this + yaml_str = stream.decode('utf-8') + else: + yaml_str = stream.read() + map_indent = None + indent = None # default if not found for some reason + block_seq_indent = None + prev_line_key_only = None + key_indent = 0 + for line in yaml_str.splitlines(): + rline = line.rstrip() + lline = rline.lstrip() + if lline.startswith('- '): + l_s = leading_spaces(line) + block_seq_indent = l_s - key_indent + idx = l_s + 1 + while line[idx] == ' ': # this will end as we rstripped + idx += 1 + if line[idx] == '#': # comment after - + continue + indent = idx - key_indent + break + if map_indent is None and prev_line_key_only is not None and rline: + idx = 0 + while line[idx] in ' -': + idx += 1 + if idx > prev_line_key_only: + map_indent = idx - prev_line_key_only + if rline.endswith(':'): + key_indent = leading_spaces(line) + idx = 0 + while line[idx] == ' ': # this will end on ':' + idx += 1 + prev_line_key_only = idx + continue + prev_line_key_only = None + if indent is None and map_indent is not None: + indent = map_indent + yaml = YAML() + return yaml.load(yaml_str), indent, block_seq_indent # type: ignore + + +def configobj_walker(cfg): + # type: (Any) -> Any + """ + walks over a ConfigObj (INI file with comments) generating + corresponding YAML output (including comments + """ + from configobj import ConfigObj # type: ignore + + assert isinstance(cfg, ConfigObj) + for c in cfg.initial_comment: + if c.strip(): + yield c + for s in _walk_section(cfg): + if s.strip(): + yield s + for c in cfg.final_comment: + if c.strip(): + yield c + + +def _walk_section(s, level=0): + # type: (Any, int) -> Any + from configobj import Section + + assert isinstance(s, Section) + indent = ' ' * level + for name in s.scalars: + for c in s.comments[name]: + yield indent + c.strip() + x = s[name] + if '\n' in x: + i = indent + ' ' + x = '|\n' + i + x.strip().replace('\n', '\n' + i) + elif ':' in x: + x = "'" + x.replace("'", "''") + "'" + line = '{0}{1}: {2}'.format(indent, name, x) + c = s.inline_comments[name] + if c: + line += ' ' + c + yield line + for name in s.sections: + for c in s.comments[name]: + yield indent + c.strip() + line = '{0}{1}:'.format(indent, name) + c = s.inline_comments[name] + if c: + line += ' ' + c + yield line + for val in _walk_section(s[name], level=level + 1): + yield val + + +# def config_obj_2_rt_yaml(cfg): +# from .comments import CommentedMap, CommentedSeq +# from configobj import ConfigObj +# assert isinstance(cfg, ConfigObj) +# #for c in cfg.initial_comment: +# # if c.strip(): +# # pass +# cm = CommentedMap() +# for name in s.sections: +# cm[name] = d = CommentedMap() +# +# +# #for c in cfg.final_comment: +# # if c.strip(): +# # yield c +# return cm |