summaryrefslogtreecommitdiffstats
path: root/src/debputy/manifest_parser/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/debputy/manifest_parser/util.py')
-rw-r--r--src/debputy/manifest_parser/util.py314
1 files changed, 314 insertions, 0 deletions
diff --git a/src/debputy/manifest_parser/util.py b/src/debputy/manifest_parser/util.py
new file mode 100644
index 0000000..1600a90
--- /dev/null
+++ b/src/debputy/manifest_parser/util.py
@@ -0,0 +1,314 @@
+import dataclasses
+from typing import (
+ Iterator,
+ Union,
+ Self,
+ Optional,
+ List,
+ Tuple,
+ Mapping,
+ get_origin,
+ get_args,
+ Any,
+ Type,
+ TypeVar,
+ TYPE_CHECKING,
+)
+
+if TYPE_CHECKING:
+ from debputy.manifest_parser.declarative_parser import DebputyParseHint
+
+
+MP = TypeVar("MP", bound="DebputyParseHint")
+StrOrInt = Union[str, int]
+AttributePathAliasMapping = Mapping[
+ StrOrInt, Tuple[StrOrInt, Optional["AttributePathAliasMapping"]]
+]
+
+
+class AttributePath(object):
+ __slots__ = ("parent", "name", "alias_mapping", "path_hint")
+
+ def __init__(
+ self,
+ parent: Optional["AttributePath"],
+ key: Optional[Union[str, int]],
+ *,
+ alias_mapping: Optional[AttributePathAliasMapping] = None,
+ ) -> None:
+ self.parent = parent
+ self.name = key
+ self.path_hint: Optional[str] = None
+ self.alias_mapping = alias_mapping
+
+ @classmethod
+ def root_path(cls) -> "AttributePath":
+ return AttributePath(None, None)
+
+ @classmethod
+ def builtin_path(cls) -> "AttributePath":
+ return AttributePath(None, "$builtin$")
+
+ @classmethod
+ def test_path(cls) -> "AttributePath":
+ return AttributePath(None, "$test$")
+
+ def __bool__(self) -> bool:
+ return self.name is not None or self.parent is not None
+
+ def copy_with_path_hint(self, path_hint: str) -> "AttributePath":
+ p = self.__class__(self.parent, self.name, alias_mapping=self.alias_mapping)
+ p.path_hint = path_hint
+ return p
+
+ @property
+ def path(self) -> str:
+ segments = list(self._iter_path())
+ segments.reverse()
+ parts: List[str] = []
+ path_hint = None
+
+ for s in segments:
+ k = s.name
+ s_path_hint = s.path_hint
+ if s_path_hint is not None:
+ path_hint = s_path_hint
+ if isinstance(k, int):
+ parts.append(f"[{k}]")
+ elif k is not None:
+ if parts:
+ parts.append(".")
+ parts.append(k)
+ if path_hint:
+ parts.append(f" <Search for: {path_hint}>")
+ if not parts:
+ return "document root"
+ return "".join(parts)
+
+ def __str__(self) -> str:
+ return self.path
+
+ def __getitem__(self, item: Union[str, int]) -> "AttributePath":
+ alias_mapping = None
+ if self.alias_mapping:
+ match = self.alias_mapping.get(item)
+ if match:
+ item, alias_mapping = match
+ if item == "":
+ # Support `sources[0]` mapping to `source` by `sources -> source` and `0 -> ""`.
+ return AttributePath(
+ self.parent, self.name, alias_mapping=alias_mapping
+ )
+ return AttributePath(self, item, alias_mapping=alias_mapping)
+
+ def _iter_path(self) -> Iterator["AttributePath"]:
+ current = self
+ yield current
+ while True:
+ parent = current.parent
+ if not parent:
+ break
+ current = parent
+ yield current
+
+
+@dataclasses.dataclass(slots=True, frozen=True)
+class _SymbolicModeSegment:
+ base_mode: int
+ base_mask: int
+ cap_x_mode: int
+ cap_x_mask: int
+
+ def apply(self, current_mode: int, is_dir: bool) -> int:
+ if current_mode & 0o111 or is_dir:
+ chosen_mode = self.cap_x_mode
+ mode_mask = self.cap_x_mask
+ else:
+ chosen_mode = self.base_mode
+ mode_mask = self.base_mask
+ # set ("="): mode mask clears relevant segment and current_mode are the desired bits
+ # add ("+"): mode mask keeps everything and current_mode are the desired bits
+ # remove ("-"): mode mask clears relevant bits and current_mode are 0
+ return (current_mode & mode_mask) | chosen_mode
+
+
+def _symbolic_mode_bit_inverse(v: int) -> int:
+ # The & part is necessary because otherwise python narrows the inversion to the minimum number of bits
+ # required, which is not what we want.
+ return ~v & 0o7777
+
+
+def parse_symbolic_mode(
+ symbolic_mode: str,
+ attribute_path: Optional[AttributePath],
+) -> Iterator[_SymbolicModeSegment]:
+ sticky_bit = 0o01000
+ setuid_bit = 0o04000
+ setgid_bit = 0o02000
+ mode_group_flag = 0o7
+ subject_mask_and_shift = {
+ "u": (mode_group_flag << 6, 6),
+ "g": (mode_group_flag << 3, 3),
+ "o": (mode_group_flag << 0, 0),
+ }
+ bits = {
+ "r": (0o4, 0o4),
+ "w": (0o2, 0o2),
+ "x": (0o1, 0o1),
+ "X": (0o0, 0o1),
+ "s": (0o0, 0o0), # Special-cased below (it depends on the subject)
+ "t": (0o0, 0o0), # Special-cased below
+ }
+ modifiers = {
+ "+",
+ "-",
+ "=",
+ }
+ in_path = f" in {attribute_path.path}" if attribute_path is not None else ""
+ for orig_part in symbolic_mode.split(","):
+ base_mode = 0
+ cap_x_mode = 0
+ part = orig_part
+ subjects = set()
+ while part and part[0] in ("u", "g", "o", "a"):
+ subject = part[0]
+ if subject == "a":
+ subjects = {"u", "g", "o"}
+ else:
+ subjects.add(subject)
+ part = part[1:]
+ if not subjects:
+ subjects = {"u", "g", "o"}
+
+ if part and part[0] in modifiers:
+ modifier = part[0]
+ elif not part:
+ raise ValueError(
+ f'Invalid symbolic mode{in_path}: expected [+-=] to be present (from "{orig_part}")'
+ )
+ else:
+ raise ValueError(
+ f'Invalid symbolic mode{in_path}: Expected "{part[0]}" to be one of [+-=]'
+ f' (from "{orig_part}")'
+ )
+ part = part[1:]
+ s_bit_seen = False
+ t_bit_seen = False
+ while part and part[0] in bits:
+ if part == "s":
+ s_bit_seen = True
+ elif part == "t":
+ t_bit_seen = True
+ elif part in ("u", "g", "o"):
+ raise NotImplementedError(
+ f"Cannot parse symbolic mode{in_path}: Sorry, we do not support referencing an"
+ " existing subject's permissions (a=u) in symbolic modes."
+ )
+ else:
+ matched_bits = bits.get(part[0])
+ if matched_bits is None:
+ valid_bits = "".join(bits)
+ raise ValueError(
+ f'Invalid symbolic mode{in_path}: Expected "{part[0]}" to be one of the letters'
+ f' in "{valid_bits}" (from "{orig_part}")'
+ )
+ base_mode_bits, cap_x_mode_bits = bits[part[0]]
+ base_mode |= base_mode_bits
+ cap_x_mode |= cap_x_mode_bits
+ part = part[1:]
+
+ if part:
+ raise ValueError(
+ f'Invalid symbolic mode{in_path}: Could not parse "{part[0]}" from "{orig_part}"'
+ )
+
+ final_base_mode = 0
+ final_cap_x_mode = 0
+ segment_mask = 0
+ for subject in subjects:
+ mask, shift = subject_mask_and_shift[subject]
+ segment_mask |= mask
+ final_base_mode |= base_mode << shift
+ final_cap_x_mode |= cap_x_mode << shift
+ if modifier == "=":
+ segment_mask |= setuid_bit if "u" in subjects else 0
+ segment_mask |= setgid_bit if "g" in subjects else 0
+ segment_mask |= sticky_bit if "o" in subjects else 0
+ if s_bit_seen:
+ if "u" in subjects:
+ final_base_mode |= setuid_bit
+ final_cap_x_mode |= setuid_bit
+ if "g" in subjects:
+ final_base_mode |= setgid_bit
+ final_cap_x_mode |= setgid_bit
+ if t_bit_seen:
+ final_base_mode |= sticky_bit
+ final_cap_x_mode |= sticky_bit
+ if modifier == "+":
+ final_base_mask = ~0
+ final_cap_x_mask = ~0
+ elif modifier == "-":
+ final_base_mask = _symbolic_mode_bit_inverse(final_base_mode)
+ final_cap_x_mask = _symbolic_mode_bit_inverse(final_cap_x_mode)
+ final_base_mode = 0
+ final_cap_x_mode = 0
+ elif modifier == "=":
+ # FIXME: Handle "unmentioned directory's setgid/setuid bits"
+ inverted_mask = _symbolic_mode_bit_inverse(segment_mask)
+ final_base_mask = inverted_mask
+ final_cap_x_mask = inverted_mask
+ else:
+ raise AssertionError(
+ f"Unknown modifier in symbolic mode: {modifier} - should not have happened"
+ )
+ yield _SymbolicModeSegment(
+ base_mode=final_base_mode,
+ base_mask=final_base_mask,
+ cap_x_mode=final_cap_x_mode,
+ cap_x_mask=final_cap_x_mask,
+ )
+
+
+def unpack_type(
+ orig_type: Any,
+ parsing_typed_dict_attribute: bool,
+) -> Tuple[Any, Optional[Any], Tuple[Any, ...]]:
+ raw_type = orig_type
+ origin = get_origin(raw_type)
+ args = get_args(raw_type)
+ if not parsing_typed_dict_attribute and repr(origin) in (
+ "typing.NotRequired",
+ "typing.Required",
+ ):
+ raise ValueError(
+ f"The Required/NotRequired attributes cannot be used outside typed dicts,"
+ f" the type that triggered the error: {orig_type}"
+ )
+
+ while repr(origin) in ("typing.NotRequired", "typing.Required"):
+ if len(args) != 1:
+ raise ValueError(
+ f"The type {raw_type} should have exactly one type parameter"
+ )
+ raw_type = args[0]
+ origin = get_origin(raw_type)
+ args = get_args(raw_type)
+
+ assert not isinstance(raw_type, tuple)
+
+ return raw_type, origin, args
+
+
+def find_annotation(
+ annotations: Tuple[Any, ...],
+ anno_class: Type[MP],
+) -> Optional[MP]:
+ m = None
+ for anno in annotations:
+ if isinstance(anno, anno_class):
+ if m is not None:
+ raise ValueError(
+ f"The annotation {anno_class.__name__} was used more than once"
+ )
+ m = anno
+ return m