summaryrefslogtreecommitdiffstats
path: root/util.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 00:30:09 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 00:30:09 +0000
commit4dd6e896edac1096089a9cd54296266436ffec03 (patch)
treea3da8663c704704177ad944ae5a680c302f29f55 /util.py
parentInitial commit. (diff)
downloadruamel.yaml-4dd6e896edac1096089a9cd54296266436ffec03.tar.xz
ruamel.yaml-4dd6e896edac1096089a9cd54296266436ffec03.zip
Adding upstream version 0.18.5.upstream/0.18.5
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'util.py')
-rw-r--r--util.py257
1 files changed, 257 insertions, 0 deletions
diff --git a/util.py b/util.py
new file mode 100644
index 0000000..b621ce0
--- /dev/null
+++ b/util.py
@@ -0,0 +1,257 @@
+# coding: utf-8
+
+"""
+some helper functions that might be generally useful
+"""
+
+import datetime
+from functools import partial
+import re
+
+
+from typing import Any, Dict, Optional, List, Text, Callable, Union # NOQA
+from .compat import StreamTextType # NOQA
+
+
+class LazyEval:
+ """
+ Lightweight wrapper around lazily evaluated func(*args, **kwargs).
+
+ func is only evaluated when any attribute of its return value is accessed.
+ Every attribute access is passed through to the wrapped value.
+ (This only excludes special cases like method-wrappers, e.g., __hash__.)
+ The sole additional attribute is the lazy_self function which holds the
+ return value (or, prior to evaluation, func and arguments), in its closure.
+ """
+
+ def __init__(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
+ def lazy_self() -> Any:
+ return_value = func(*args, **kwargs)
+ object.__setattr__(self, 'lazy_self', lambda: return_value)
+ return return_value
+
+ object.__setattr__(self, 'lazy_self', lazy_self)
+
+ def __getattribute__(self, name: str) -> Any:
+ lazy_self = object.__getattribute__(self, 'lazy_self')
+ if name == 'lazy_self':
+ return lazy_self
+ return getattr(lazy_self(), name)
+
+ def __setattr__(self, name: str, value: Any) -> None:
+ setattr(self.lazy_self(), name, value)
+
+
+RegExp = partial(LazyEval, re.compile)
+
+timestamp_regexp = RegExp(
+ """^(?P<year>[0-9][0-9][0-9][0-9])
+ -(?P<month>[0-9][0-9]?)
+ -(?P<day>[0-9][0-9]?)
+ (?:((?P<t>[Tt])|[ \\t]+) # explictly not retaining extra spaces
+ (?P<hour>[0-9][0-9]?)
+ :(?P<minute>[0-9][0-9])
+ :(?P<second>[0-9][0-9])
+ (?:\\.(?P<fraction>[0-9]*))?
+ (?:[ \\t]*(?P<tz>Z|(?P<tz_sign>[-+])(?P<tz_hour>[0-9][0-9]?)
+ (?::(?P<tz_minute>[0-9][0-9]))?))?)?$""",
+ re.X,
+)
+
+
+def create_timestamp(
+ year: Any,
+ month: Any,
+ day: Any,
+ t: Any,
+ hour: Any,
+ minute: Any,
+ second: Any,
+ fraction: Any,
+ tz: Any,
+ tz_sign: Any,
+ tz_hour: Any,
+ tz_minute: Any,
+) -> Union[datetime.datetime, datetime.date]:
+ # create a timestamp from match against timestamp_regexp
+ MAX_FRAC = 999999
+ year = int(year)
+ month = int(month)
+ day = int(day)
+ if not hour:
+ return datetime.date(year, month, day)
+ hour = int(hour)
+ minute = int(minute)
+ second = int(second)
+ frac = 0
+ if fraction:
+ frac_s = fraction[:6]
+ while len(frac_s) < 6:
+ frac_s += '0'
+ frac = int(frac_s)
+ if len(fraction) > 6 and int(fraction[6]) > 4:
+ frac += 1
+ if frac > MAX_FRAC:
+ fraction = 0
+ else:
+ fraction = frac
+ else:
+ fraction = 0
+ delta = None
+ if tz_sign:
+ tz_hour = int(tz_hour)
+ tz_minute = int(tz_minute) if tz_minute else 0
+ delta = datetime.timedelta(
+ hours=tz_hour, minutes=tz_minute, seconds=1 if frac > MAX_FRAC else 0,
+ )
+ if tz_sign == '-':
+ delta = -delta
+ elif frac > MAX_FRAC:
+ delta = -datetime.timedelta(seconds=1)
+ # should do something else instead (or hook this up to the preceding if statement
+ # in reverse
+ # if delta is None:
+ # return datetime.datetime(year, month, day, hour, minute, second, fraction)
+ # return datetime.datetime(year, month, day, hour, minute, second, fraction,
+ # datetime.timezone.utc)
+ # the above is not good enough though, should provide tzinfo. In Python3 that is easily
+ # doable drop that kind of support for Python2 as it has not native tzinfo
+ data = datetime.datetime(year, month, day, hour, minute, second, fraction)
+ if delta:
+ data -= delta
+ return data
+
+
+# originally as comment
+# https://github.com/pre-commit/pre-commit/pull/211#issuecomment-186466605
+# if you use this in your code, I suggest adding a test in your test suite
+# that check this routines output against a known piece of your YAML
+# before upgrades to this code break your round-tripped YAML
+def load_yaml_guess_indent(stream: StreamTextType, **kw: Any) -> Any:
+ """guess the indent and block sequence indent of yaml stream/string
+
+ returns round_trip_loaded stream, indent level, block sequence indent
+ - block sequence indent is the number of spaces before a dash relative to previous indent
+ - if there are no block sequences, indent is taken from nested mappings, block sequence
+ indent is unset (None) in that case
+ """
+ from .main import YAML
+
+ # load a YAML document, guess the indentation, if you use TABs you are on your own
+ def leading_spaces(line: Any) -> int:
+ idx = 0
+ while idx < len(line) and line[idx] == ' ':
+ idx += 1
+ return idx
+
+ if isinstance(stream, str):
+ yaml_str: Any = stream
+ elif isinstance(stream, bytes):
+ # most likely, but the Reader checks BOM for this
+ yaml_str = stream.decode('utf-8')
+ else:
+ yaml_str = stream.read()
+ map_indent = None
+ indent = None # default if not found for some reason
+ block_seq_indent = None
+ prev_line_key_only = None
+ key_indent = 0
+ for line in yaml_str.splitlines():
+ rline = line.rstrip()
+ lline = rline.lstrip()
+ if lline.startswith('- '):
+ l_s = leading_spaces(line)
+ block_seq_indent = l_s - key_indent
+ idx = l_s + 1
+ while line[idx] == ' ': # this will end as we rstripped
+ idx += 1
+ if line[idx] == '#': # comment after -
+ continue
+ indent = idx - key_indent
+ break
+ if map_indent is None and prev_line_key_only is not None and rline:
+ idx = 0
+ while line[idx] in ' -':
+ idx += 1
+ if idx > prev_line_key_only:
+ map_indent = idx - prev_line_key_only
+ if rline.endswith(':'):
+ key_indent = leading_spaces(line)
+ idx = 0
+ while line[idx] == ' ': # this will end on ':'
+ idx += 1
+ prev_line_key_only = idx
+ continue
+ prev_line_key_only = None
+ if indent is None and map_indent is not None:
+ indent = map_indent
+ yaml = YAML()
+ return yaml.load(yaml_str, **kw), indent, block_seq_indent
+
+
+def configobj_walker(cfg: Any) -> Any:
+ """
+ walks over a ConfigObj (INI file with comments) generating
+ corresponding YAML output (including comments
+ """
+ from configobj import ConfigObj # type: ignore
+
+ assert isinstance(cfg, ConfigObj)
+ for c in cfg.initial_comment:
+ if c.strip():
+ yield c
+ for s in _walk_section(cfg):
+ if s.strip():
+ yield s
+ for c in cfg.final_comment:
+ if c.strip():
+ yield c
+
+
+def _walk_section(s: Any, level: int = 0) -> Any:
+ from configobj import Section
+
+ assert isinstance(s, Section)
+ indent = ' ' * level
+ for name in s.scalars:
+ for c in s.comments[name]:
+ yield indent + c.strip()
+ x = s[name]
+ if '\n' in x:
+ i = indent + ' '
+ x = '|\n' + i + x.strip().replace('\n', '\n' + i)
+ elif ':' in x:
+ x = "'" + x.replace("'", "''") + "'"
+ line = f'{indent}{name}: {x}'
+ c = s.inline_comments[name]
+ if c:
+ line += ' ' + c
+ yield line
+ for name in s.sections:
+ for c in s.comments[name]:
+ yield indent + c.strip()
+ line = f'{indent}{name}:'
+ c = s.inline_comments[name]
+ if c:
+ line += ' ' + c
+ yield line
+ for val in _walk_section(s[name], level=level + 1):
+ yield val
+
+
+# def config_obj_2_rt_yaml(cfg):
+# from .comments import CommentedMap, CommentedSeq
+# from configobj import ConfigObj
+# assert isinstance(cfg, ConfigObj)
+# #for c in cfg.initial_comment:
+# # if c.strip():
+# # pass
+# cm = CommentedMap()
+# for name in s.sections:
+# cm[name] = d = CommentedMap()
+#
+#
+# #for c in cfg.final_comment:
+# # if c.strip():
+# # yield c
+# return cm