from typing import (Any, Dict, Iterator, List, Optional, MutableMapping, Set, Text, Tuple, Type, TYPE_CHECKING, Union) from .item import ManifestItem if TYPE_CHECKING: # avoid actually importing these, they're only used by type comments from .manifest import Manifest TypeDataType = MutableMapping[Tuple[str, ...], Set[ManifestItem]] PathHashType = MutableMapping[Tuple[str, ...], str] class TypeData(TypeDataType): def __init__(self, m: "Manifest", type_cls: Type[ManifestItem]) -> None: """Dict-like object containing the TestItems for each test type. Loading an actual Item class for each test is unnecessarily slow, so this class allows lazy-loading of the test items. When the manifest is loaded we store the raw json corresponding to the test type, and only create an Item subclass when the test is accessed. In order to remain API-compatible with consumers that depend on getting an Item from iteration, we do egerly load all items when iterating over the class.""" self._manifest = m self._type_cls: Type[ManifestItem] = type_cls self._json_data: Dict[Text, Any] = {} self._data: Dict[Text, Any] = {} self._hashes: Dict[Tuple[Text, ...], Text] = {} self.hashes = PathHash(self) def _delete_node(self, data: Dict[Text, Any], key: Tuple[Text, ...]) -> None: """delete a path from a Dict data with a given key""" path = [] node = data for pathseg in key[:-1]: path.append((node, pathseg)) node = node[pathseg] if not isinstance(node, dict): raise KeyError(key) del node[key[-1]] while path: node, pathseg = path.pop() if len(node[pathseg]) == 0: del node[pathseg] else: break def __getitem__(self, key: Tuple[Text, ...]) -> Set[ManifestItem]: node: Union[Dict[Text, Any], Set[ManifestItem], List[Any]] = self._data for pathseg in key: if isinstance(node, dict) and pathseg in node: node = node[pathseg] else: break else: if isinstance(node, set): return node else: raise KeyError(key) node = self._json_data found = False for pathseg in key: if isinstance(node, dict) and pathseg in node: node = node[pathseg] else: break else: found = True if not found: raise KeyError(key) if not isinstance(node, list): raise KeyError(key) self._hashes[key] = node[0] data = set() path = "/".join(key) for test in node[1:]: manifest_item = self._type_cls.from_json(self._manifest, path, test) data.add(manifest_item) node = self._data assert isinstance(node, dict) for pathseg in key[:-1]: node = node.setdefault(pathseg, {}) assert isinstance(node, dict) assert key[-1] not in node node[key[-1]] = data self._delete_node(self._json_data, key) return data def __setitem__(self, key: Tuple[Text, ...], value: Set[ManifestItem]) -> None: try: self._delete_node(self._json_data, key) except KeyError: pass node = self._data for i, pathseg in enumerate(key[:-1]): node = node.setdefault(pathseg, {}) if not isinstance(node, dict): raise KeyError(f"{key!r} is a child of a test ({key[:i+1]!r})") node[key[-1]] = value def __delitem__(self, key: Tuple[Text, ...]) -> None: try: self._delete_node(self._data, key) except KeyError: self._delete_node(self._json_data, key) else: try: del self._hashes[key] except KeyError: pass def __iter__(self) -> Iterator[Tuple[Text, ...]]: """Iterator over keys in the TypeData in codepoint order""" data_node: Optional[Union[Dict[Text, Any], Set[ManifestItem]]] = self._data json_node: Optional[Union[Dict[Text, Any], List[Any]]] = self._json_data path: Tuple[Text, ...] = tuple() stack = [(data_node, json_node, path)] while stack: data_node, json_node, path = stack.pop() if isinstance(data_node, set) or isinstance(json_node, list): assert data_node is None or json_node is None yield path else: assert data_node is None or isinstance(data_node, dict) assert json_node is None or isinstance(json_node, dict) keys: Set[Text] = set() if data_node is not None: keys |= set(iter(data_node)) if json_node is not None: keys |= set(iter(json_node)) for key in sorted(keys, reverse=True): stack.append((data_node.get(key) if data_node is not None else None, json_node.get(key) if json_node is not None else None, path + (key,))) def __len__(self) -> int: count = 0 stack: List[Union[Dict[Text, Any], Set[ManifestItem]]] = [self._data] while stack: v = stack.pop() if isinstance(v, set): count += 1 else: stack.extend(v.values()) json_stack: List[Union[Dict[Text, Any], List[Any]]] = [self._json_data] while json_stack: json_v = json_stack.pop() if isinstance(json_v, list): count += 1 else: json_stack.extend(json_v.values()) return count def __nonzero__(self) -> bool: return bool(self._data) or bool(self._json_data) __bool__ = __nonzero__ def __contains__(self, key: Any) -> bool: # we provide our own impl of this to avoid calling __getitem__ and generating items for # those in self._json_data node = self._data for pathseg in key: if pathseg in node: node = node[pathseg] else: break else: return bool(isinstance(node, set)) node = self._json_data for pathseg in key: if pathseg in node: node = node[pathseg] else: break else: return bool(isinstance(node, list)) return False def clear(self) -> None: # much, much simpler/quicker than that defined in MutableMapping self._json_data.clear() self._data.clear() self._hashes.clear() def set_json(self, json_data: Dict[Text, Any]) -> None: """Provide the object with a raw JSON blob Note that this object graph is assumed to be owned by the TypeData object after the call, so the caller must not mutate any part of the graph. """ if self._json_data: raise ValueError("set_json call when JSON data is not empty") self._json_data = json_data def to_json(self) -> Dict[Text, Any]: """Convert the current data to JSON Note that the returned object may contain references to the internal data structures, and is only guaranteed to be valid until the next __getitem__, __setitem__, __delitem__ call, so the caller must not mutate any part of the returned object graph. """ json_rv = self._json_data.copy() def safe_sorter(element: Tuple[str,str]) -> Tuple[str,str]: """key function to sort lists with None values.""" if element and not element[0]: return ("", element[1]) else: return element stack: List[Tuple[Dict[Text, Any], Dict[Text, Any], Tuple[Text, ...]]] = [(self._data, json_rv, tuple())] while stack: data_node, json_node, par_full_key = stack.pop() for k, v in data_node.items(): full_key = par_full_key + (k,) if isinstance(v, set): assert k not in json_node json_node[k] = [self._hashes.get( full_key)] + [t for t in sorted((test.to_json() for test in v), key=safe_sorter)] else: json_node[k] = json_node.get(k, {}).copy() stack.append((v, json_node[k], full_key)) return json_rv class PathHash(PathHashType): def __init__(self, data: TypeData) -> None: self._data = data def __getitem__(self, k: Tuple[Text, ...]) -> Text: if k not in self._data: raise KeyError if k in self._data._hashes: return self._data._hashes[k] node = self._data._json_data for pathseg in k: if pathseg in node: node = node[pathseg] else: break else: return node[0] # type: ignore assert False, "unreachable" raise KeyError def __setitem__(self, k: Tuple[Text, ...], v: Text) -> None: if k not in self._data: raise KeyError if k in self._data._hashes: self._data._hashes[k] = v node = self._data._json_data for pathseg in k: if pathseg in node: node = node[pathseg] else: break else: node[0] = v # type: ignore return self._data._hashes[k] = v def __delitem__(self, k: Tuple[Text, ...]) -> None: raise ValueError("keys here must match underlying data") def __iter__(self) -> Iterator[Tuple[Text, ...]]: return iter(self._data) def __len__(self) -> int: return len(self._data)