diff options
Diffstat (limited to 'third_party/rust/jsparagus/jsparagus/actions.py')
-rw-r--r-- | third_party/rust/jsparagus/jsparagus/actions.py | 651 |
1 files changed, 651 insertions, 0 deletions
diff --git a/third_party/rust/jsparagus/jsparagus/actions.py b/third_party/rust/jsparagus/jsparagus/actions.py new file mode 100644 index 0000000000..29d810edbb --- /dev/null +++ b/third_party/rust/jsparagus/jsparagus/actions.py @@ -0,0 +1,651 @@ +from __future__ import annotations +# mypy: disallow-untyped-defs, disallow-incomplete-defs, disallow-untyped-calls + +import typing +import dataclasses + +from .ordered import OrderedFrozenSet +from .grammar import Element, ErrorSymbol, InitNt, Nt +from . import types, grammar + +# Avoid circular reference between this module and parse_table.py +if typing.TYPE_CHECKING: + from .parse_table import StateId + + +@dataclasses.dataclass(frozen=True) +class StackDiff: + """StackDiff represent stack mutations which have to be performed when executing an action. + """ + __slots__ = ['pop', 'nt', 'replay'] + + # Example: We have shifted `b * c X Y`. We want to reduce `b * c` to Mul. + # + # In the initial LR0 pass over the grammar, this produces a Reduce edge. + # + # action pop replay + # -------------- ------ --------- + # Reduce 3 (`b * c`) 2 (`X Y`) + # The parser moves `X Y` to the replay queue, pops `b * c`, creates the + # new `Mul` nonterminal, consults the stack and parse table to determine + # the new state id, then replays the new nonterminal. Reduce leaves `X Y` + # on the runtime replay queue. It's the runtime's responsibility to + # notice that they are there and replay them. + # + # Later, the Reduce edge might be lowered into an [Unwind; FilterState; + # Replay] sequence, which encode both the Reduce action, and the expected + # behavior of the runtime. + # + # action pop replay + # -------------- ------ --------- + # Unwind 3 2 + # The parser moves `X Y` to the replay queue, pops `b * c`, creates the + # new `Mul` nonterminal, and inserts it at the front of the replay queue. + # + # FilterState --- --- + # Determines the new state id, if it's context-dependent. + # This doesn't touch the stack, so no StackDiff. + # + # Replay 0 -3 + # Shift the three elements we left on the replay queue back to the stack: + # `(b*c) X Y`. + + # Number of elements to be popped from the stack, this is used when + # reducing the stack with a non-terminal. + # + # This number is always positive or zero. + pop: int + + # When reducing, a non-terminal is pushed after removing all replayed and + # popped elements. If not None, this is the non-terminal which is produced + # by reducing the action. + nt: typing.Union[Nt, ErrorSymbol, None] + + # Number of terms this action moves from the stack to the runtime replay + # queue (not counting `self.nt`), or from the replay queue to the stack if + # negative. + # + # When executing actions, some lookahead might have been used to make the + # parse table consistent. Replayed terms are popped before popping any + # elements from the stack, and they are added in reversed order in the + # replay list, such that they would be shifted after shifting the `reduced` + # non-terminal. + # + # This number might also be negative, in which case some lookahead terms + # are expected to exists in the replay list, and they are shifted back. + # This must happen only follow_edge is True. + replay: int + + def reduce_stack(self) -> bool: + """Returns whether the action is reducing the stack by replacing popped + elements by a non-terminal. Note, this test is simpler than checking + for instances, as Reduce / Unwind might either be present, or present + as part of the last element of a Seq action. """ + return self.nt is not None + + +class Action: + __slots__ = ["_hash"] + + # Cached hash. + _hash: typing.Optional[int] + + def __init__(self) -> None: + self._hash = None + + def is_inconsistent(self) -> bool: + """Returns True if this action is inconsistent. An action can be + inconsistent if the parameters it is given cannot be evaluated given + its current location in the parse table. Such as CheckNotOnNewLine. + """ + return False + + def is_condition(self) -> bool: + "Unordered condition, which accept or not to reach the next state." + return False + + def condition(self) -> Action: + "Return the conditional action." + raise TypeError("Action.condition not implemented") + + def check_same_variable(self, other: Action) -> bool: + "Return whether both conditionals are checking the same variable." + assert self.is_condition() + raise TypeError("Action.check_same_variable not implemented") + + def check_different_values(self, other: Action) -> bool: + "Return whether these 2 conditions are mutually exclusive." + assert self.is_condition() + raise TypeError("Action.check_different_values not implemented") + + def follow_edge(self) -> bool: + """Whether the execution of this action resume following the epsilon transition + (True) or if it breaks the graph epsilon transition (False) and returns + at a different location, defined by the top of the stack.""" + return True + + def update_stack(self) -> bool: + """Whether the execution of this action changes the parser stack.""" + return False + + def update_stack_with(self) -> StackDiff: + """Returns a StackDiff which represents the mutation to be applied to the + parser stack.""" + assert self.update_stack() + raise TypeError("Action.update_stack_with not implemented") + + def unshift_action(self, num: int) -> Action: + """When manipulating stack operation, we have the option to unshift some + replayed token which were shifted to disambiguate the grammar. However, + they might no longer be needed in some cases.""" + raise TypeError("{} cannot be unshifted".format(self.__class__.__name__)) + + def shifted_action(self, shifted_term: Element) -> ShiftedAction: + """Transpose this action with shifting the given terminal or Nt. + + That is, the sequence of: + - performing the action `self`, then + - shifting `shifted_term` + has the same effect as: + - shifting `shifted_term`, then + - performing the action `self.shifted_action(shifted_term)`. + + If the resulting shifted action would be a no-op, instead return True. + + If this is a conditional action and `shifted_term` indicates that the + condition wasn't met, return False. + """ + return self + + def contains_accept(self) -> bool: + "Returns whether the current action stops the parser." + return False + + def rewrite_state_indexes(self, state_map: typing.Dict[StateId, StateId]) -> Action: + """If the action contains any state index, use the map to map the old index to + the new indexes""" + return self + + def fold_by_destination(self, actions: typing.List[Action]) -> typing.List[Action]: + """If after rewriting state indexes, multiple condition are reaching the same + destination state, we attempt to fold them by destination. Not + implementing this function can lead to the introduction of inconsistent + states, as the conditions might be redundant. """ + + # By default do nothing. + return actions + + def state_refs(self) -> typing.List[StateId]: + """List of states which are referenced by this action.""" + # By default do nothing. + return [] + + def __eq__(self, other: object) -> bool: + if self.__class__ != other.__class__: + return False + assert isinstance(other, Action) + for s in self.__slots__: + if getattr(self, s) != getattr(other, s): + return False + return True + + def __hash__(self) -> int: + if self._hash is not None: + return self._hash + + def hashed_content() -> typing.Iterator[object]: + yield self.__class__ + for s in self.__slots__: + yield repr(getattr(self, s)) + + self._hash = hash(tuple(hashed_content())) + return self._hash + + def __lt__(self, other: Action) -> bool: + return hash(self) < hash(other) + + def __repr__(self) -> str: + return str(self) + + def stable_str(self, states: typing.Any) -> str: + return str(self) + + +ShiftedAction = typing.Union[Action, bool] + + +class Replay(Action): + """Replay a term which was previously saved by the Unwind function. Note that + this does not Shift a term given as argument as the replay action should + always be garanteed and that we want to maximize the sharing of code when + possible.""" + __slots__ = ['replay_steps'] + + replay_steps: typing.Tuple[StateId, ...] + + def __init__(self, replay_steps: typing.Iterable[StateId]): + super().__init__() + self.replay_steps = tuple(replay_steps) + + def update_stack(self) -> bool: + return True + + def update_stack_with(self) -> StackDiff: + return StackDiff(0, None, -len(self.replay_steps)) + + def rewrite_state_indexes(self, state_map: typing.Dict[StateId, StateId]) -> Replay: + return Replay(map(lambda s: state_map[s], self.replay_steps)) + + def state_refs(self) -> typing.List[StateId]: + return list(self.replay_steps) + + def __str__(self) -> str: + return "Replay({})".format(str(self.replay_steps)) + + +class Unwind(Action): + """Define an unwind operation which pops N elements of the stack and pushes one + non-terminal. The replay argument of an unwind action corresponds to the + number of stack elements which would have to be popped and pushed again + using the parser table after executing this operation.""" + __slots__ = ['nt', 'replay', 'pop'] + + nt: Nt + pop: int + replay: int + + def __init__(self, nt: Nt, pop: int, replay: int = 0) -> None: + super().__init__() + self.nt = nt # Non-terminal which is reduced + self.pop = pop # Number of stack elements which should be replayed. + self.replay = replay # List of terms to shift back + + def __str__(self) -> str: + return "Unwind({}, {}, {})".format(self.nt, self.pop, self.replay) + + def update_stack(self) -> bool: + return True + + def update_stack_with(self) -> StackDiff: + return StackDiff(self.pop, self.nt, self.replay) + + def unshift_action(self, num: int) -> Unwind: + return Unwind(self.nt, self.pop, replay=self.replay - num) + + def shifted_action(self, shifted_term: Element) -> Unwind: + return Unwind(self.nt, self.pop, replay=self.replay + 1) + + +class Reduce(Action): + """Prevent the fall-through to the epsilon transition and returns to the shift + table execution to resume shifting or replaying terms.""" + __slots__ = ['unwind'] + + unwind: Unwind + + def __init__(self, unwind: Unwind) -> None: + nt_name = unwind.nt.name + if isinstance(nt_name, InitNt): + name = "Start_" + str(nt_name.goal.name) + else: + name = nt_name + super().__init__() + self.unwind = unwind + + def __str__(self) -> str: + return "Reduce({})".format(str(self.unwind)) + + def follow_edge(self) -> bool: + return False + + def update_stack(self) -> bool: + return self.unwind.update_stack() + + def update_stack_with(self) -> StackDiff: + return self.unwind.update_stack_with() + + def unshift_action(self, num: int) -> Reduce: + unwind = self.unwind.unshift_action(num) + return Reduce(unwind) + + def shifted_action(self, shifted_term: Element) -> Reduce: + unwind = self.unwind.shifted_action(shifted_term) + return Reduce(unwind) + + +class Accept(Action): + """This state terminate the parser by accepting the content consumed until + now.""" + __slots__: typing.List[str] = [] + + def __init__(self) -> None: + super().__init__() + + def __str__(self) -> str: + return "Accept()" + + def contains_accept(self) -> bool: + "Returns whether the current action stops the parser." + return True + + def shifted_action(self, shifted_term: Element) -> Accept: + return Accept() + + +class Lookahead(Action): + """Define a Lookahead assertion which is meant to either accept or reject + sequences of terminal/non-terminals sequences.""" + __slots__ = ['terms', 'accept'] + + terms: typing.FrozenSet[str] + accept: bool + + def __init__(self, terms: typing.FrozenSet[str], accept: bool): + super().__init__() + self.terms = terms + self.accept = accept + + def is_inconsistent(self) -> bool: + # A lookahead restriction cannot be encoded in code, it has to be + # solved using fix_with_lookahead, which encodes the lookahead + # resolution in the generated parse table. + return True + + def is_condition(self) -> bool: + return True + + def condition(self) -> Lookahead: + return self + + def check_same_variable(self, other: Action) -> bool: + raise TypeError("Lookahead.check_same_variables: Lookahead are always inconsistent") + + def check_different_values(self, other: Action) -> bool: + raise TypeError("Lookahead.check_different_values: Lookahead are always inconsistent") + + def __str__(self) -> str: + return "Lookahead({}, {})".format(self.terms, self.accept) + + def shifted_action(self, shifted_term: Element) -> ShiftedAction: + if isinstance(shifted_term, Nt): + return True + if shifted_term in self.terms: + return self.accept + return not self.accept + + +class CheckNotOnNewLine(Action): + """Check whether the terminal at the given stack offset is on a new line or + not. If not this would produce an Error, otherwise this rule would be + shifted.""" + __slots__ = ['offset'] + + offset: int + + def __init__(self, offset: int = 0) -> None: + # assert offset >= -1 and "Smaller offsets are not supported on all backends." + super().__init__() + self.offset = offset + + def is_inconsistent(self) -> bool: + # We can only look at stacked terminals. Having an offset of 0 implies + # that we are looking for the next terminal, which is not yet shifted. + # Therefore this action is inconsistent as long as the terminal is not + # on the stack. + return self.offset >= 0 + + def is_condition(self) -> bool: + return True + + def condition(self) -> CheckNotOnNewLine: + return self + + def check_same_variable(self, other: Action) -> bool: + return isinstance(other, CheckNotOnNewLine) and self.offset == other.offset + + def check_different_values(self, other: Action) -> bool: + return False + + def shifted_action(self, shifted_term: Element) -> ShiftedAction: + if isinstance(shifted_term, Nt): + return True + return CheckNotOnNewLine(self.offset - 1) + + def __str__(self) -> str: + return "CheckNotOnNewLine({})".format(self.offset) + + +class FilterStates(Action): + """Check whether the stack at a given depth match the state value, if so + transition to the destination, otherwise check other states.""" + __slots__ = ['states'] + + states: OrderedFrozenSet[StateId] + + def __init__(self, states: typing.Iterable[StateId]): + super().__init__() + # Set of states which can follow this transition. + self.states = OrderedFrozenSet(sorted(states)) + + def is_condition(self) -> bool: + return True + + def condition(self) -> FilterStates: + return self + + def check_same_variable(self, other: Action) -> bool: + return isinstance(other, FilterStates) + + def check_different_values(self, other: Action) -> bool: + assert isinstance(other, FilterStates) + return self.states.is_disjoint(other.states) + + def rewrite_state_indexes(self, state_map: typing.Dict[StateId, StateId]) -> FilterStates: + states = list(state_map[s] for s in self.states) + return FilterStates(states) + + def fold_by_destination(self, actions: typing.List[Action]) -> typing.List[Action]: + states: typing.List[StateId] = [] + for a in actions: + if not isinstance(a, FilterStates): + # Do nothing in case the state is inconsistent. + return actions + states.extend(a.states) + return [FilterStates(states)] + + def state_refs(self) -> typing.List[StateId]: + return list(self.states) + + def __str__(self) -> str: + return "FilterStates({})".format(self.states) + + +class FilterFlag(Action): + """Define a filter which check for one value of the flag, and continue to the + next state if the top of the flag stack matches the expected value.""" + __slots__ = ['flag', 'value'] + + flag: str + value: object + + def __init__(self, flag: str, value: object) -> None: + super().__init__() + self.flag = flag + self.value = value + + def is_condition(self) -> bool: + return True + + def condition(self) -> FilterFlag: + return self + + def check_same_variable(self, other: Action) -> bool: + return isinstance(other, FilterFlag) and self.flag == other.flag + + def check_different_values(self, other: Action) -> bool: + assert isinstance(other, FilterFlag) + return self.value != other.value + + def __str__(self) -> str: + return "FilterFlag({}, {})".format(self.flag, self.value) + + +class PushFlag(Action): + """Define an action which pushes a value on a stack dedicated to the flag. This + other stack correspond to another parse stack which live next to the + default state machine and is popped by PopFlag, as-if this was another + reduce action. This is particularly useful to raise the parse table from a + LR(0) to an LR(k) without needing as much state duplications.""" + __slots__ = ['flag', 'value'] + + flag: str + value: object + + def __init__(self, flag: str, value: object) -> None: + super().__init__() + self.flag = flag + self.value = value + + def __str__(self) -> str: + return "PushFlag({}, {})".format(self.flag, self.value) + + +class PopFlag(Action): + """Define an action which pops a flag from the flag bit stack.""" + __slots__ = ['flag'] + + flag: str + + def __init__(self, flag: str) -> None: + super().__init__() + self.flag = flag + + def __str__(self) -> str: + return "PopFlag({})".format(self.flag) + + +# OutputExpr: An expression mini-language that compiles very directly to code +# in the output language (Rust or Python). An OutputExpr is one of: +# +# str - an identifier in the generated code +# int - an index into the runtime stack +# None or Some(FunCallArg) - an optional value +# +OutputExpr = typing.Union[str, int, None, grammar.Some] + + +class FunCall(Action): + """Define a call method operation which reads N elements of he stack and + pushpathne non-terminal. The replay attribute of a reduce action correspond + to the number of stack elements which would have to be popped and pushed + again using the parser table after reducing this operation. """ + __slots__ = ['trait', 'method', 'offset', 'args', 'fallible', 'set_to'] + + trait: types.Type + method: str + offset: int + args: typing.Tuple[OutputExpr, ...] + fallible: bool + set_to: str + + def __init__( + self, + method: str, + args: typing.Tuple[OutputExpr, ...], + trait: types.Type = types.Type("AstBuilder"), + fallible: bool = False, + set_to: str = "val", + offset: int = 0, + ) -> None: + super().__init__() + self.trait = trait # Trait on which this method is implemented. + self.method = method # Method and argument to be read for calling it. + self.fallible = fallible # Whether the function call can fail. + self.offset = offset # Offset to add to each argument offset. + self.args = args # Tuple of arguments offsets. + self.set_to = set_to # Temporary variable name to set with the result. + + def __str__(self) -> str: + return "{} = {}::{}({}){} [off: {}]".format( + self.set_to, self.trait, self.method, + ", ".join(map(str, self.args)), + self.fallible and '?' or '', + self.offset) + + def __repr__(self) -> str: + return "FunCall({})".format(', '.join(map(repr, [ + self.trait, self.method, self.fallible, + self.args, self.set_to, self.offset + ]))) + + def unshift_action(self, num: int) -> FunCall: + return FunCall(self.method, self.args, + trait=self.trait, + fallible=self.fallible, + set_to=self.set_to, + offset=self.offset - num) + + def shifted_action(self, shifted_term: Element) -> FunCall: + return FunCall(self.method, + self.args, + trait=self.trait, + fallible=self.fallible, + set_to=self.set_to, + offset=self.offset + 1) + + +class Seq(Action): + """Aggregate multiple actions in one statement. Note, that the aggregated + actions should not contain any condition or action which are mutating the + state. Only the last action aggregated can update the parser stack""" + __slots__ = ['actions'] + + actions: typing.Tuple[Action, ...] + + def __init__(self, actions: typing.Sequence[Action]) -> None: + super().__init__() + self.actions = tuple(actions) # Ordered list of actions to execute. + assert all([not a.is_condition() for a in actions]) + assert all([not isinstance(a, Seq) for a in actions]) + assert all([a.follow_edge() for a in actions[:-1]]) + assert all([not a.update_stack() for a in actions[:-1]]) + + def __str__(self) -> str: + return "{{ {} }}".format("; ".join(map(str, self.actions))) + + def __repr__(self) -> str: + return "Seq({})".format(repr(self.actions)) + + def follow_edge(self) -> bool: + return self.actions[-1].follow_edge() + + def update_stack(self) -> bool: + return self.actions[-1].update_stack() + + def update_stack_with(self) -> StackDiff: + return self.actions[-1].update_stack_with() + + def unshift_action(self, num: int) -> Seq: + actions = list(map(lambda a: a.unshift_action(num), self.actions)) + return Seq(actions) + + def shifted_action(self, shift: Element) -> ShiftedAction: + actions: typing.List[Action] = [] + for a in self.actions: + b = a.shifted_action(shift) + if isinstance(b, bool): + if b is False: + return False + else: + actions.append(b) + return Seq(actions) + + def contains_accept(self) -> bool: + return any(a.contains_accept() for a in self.actions) + + def rewrite_state_indexes(self, state_map: typing.Dict[StateId, StateId]) -> Seq: + actions = list(map(lambda a: a.rewrite_state_indexes(state_map), self.actions)) + return Seq(actions) + + def state_refs(self) -> typing.List[StateId]: + return [s for a in self.actions for s in a.state_refs()] |