summaryrefslogtreecommitdiffstats
path: root/third_party/rust/jsparagus/jsparagus/actions.py
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/jsparagus/jsparagus/actions.py')
-rw-r--r--third_party/rust/jsparagus/jsparagus/actions.py651
1 files changed, 651 insertions, 0 deletions
diff --git a/third_party/rust/jsparagus/jsparagus/actions.py b/third_party/rust/jsparagus/jsparagus/actions.py
new file mode 100644
index 0000000000..29d810edbb
--- /dev/null
+++ b/third_party/rust/jsparagus/jsparagus/actions.py
@@ -0,0 +1,651 @@
+from __future__ import annotations
+# mypy: disallow-untyped-defs, disallow-incomplete-defs, disallow-untyped-calls
+
+import typing
+import dataclasses
+
+from .ordered import OrderedFrozenSet
+from .grammar import Element, ErrorSymbol, InitNt, Nt
+from . import types, grammar
+
+# Avoid circular reference between this module and parse_table.py
+if typing.TYPE_CHECKING:
+ from .parse_table import StateId
+
+
+@dataclasses.dataclass(frozen=True)
+class StackDiff:
+ """StackDiff represent stack mutations which have to be performed when executing an action.
+ """
+ __slots__ = ['pop', 'nt', 'replay']
+
+ # Example: We have shifted `b * c X Y`. We want to reduce `b * c` to Mul.
+ #
+ # In the initial LR0 pass over the grammar, this produces a Reduce edge.
+ #
+ # action pop replay
+ # -------------- ------ ---------
+ # Reduce 3 (`b * c`) 2 (`X Y`)
+ # The parser moves `X Y` to the replay queue, pops `b * c`, creates the
+ # new `Mul` nonterminal, consults the stack and parse table to determine
+ # the new state id, then replays the new nonterminal. Reduce leaves `X Y`
+ # on the runtime replay queue. It's the runtime's responsibility to
+ # notice that they are there and replay them.
+ #
+ # Later, the Reduce edge might be lowered into an [Unwind; FilterState;
+ # Replay] sequence, which encode both the Reduce action, and the expected
+ # behavior of the runtime.
+ #
+ # action pop replay
+ # -------------- ------ ---------
+ # Unwind 3 2
+ # The parser moves `X Y` to the replay queue, pops `b * c`, creates the
+ # new `Mul` nonterminal, and inserts it at the front of the replay queue.
+ #
+ # FilterState --- ---
+ # Determines the new state id, if it's context-dependent.
+ # This doesn't touch the stack, so no StackDiff.
+ #
+ # Replay 0 -3
+ # Shift the three elements we left on the replay queue back to the stack:
+ # `(b*c) X Y`.
+
+ # Number of elements to be popped from the stack, this is used when
+ # reducing the stack with a non-terminal.
+ #
+ # This number is always positive or zero.
+ pop: int
+
+ # When reducing, a non-terminal is pushed after removing all replayed and
+ # popped elements. If not None, this is the non-terminal which is produced
+ # by reducing the action.
+ nt: typing.Union[Nt, ErrorSymbol, None]
+
+ # Number of terms this action moves from the stack to the runtime replay
+ # queue (not counting `self.nt`), or from the replay queue to the stack if
+ # negative.
+ #
+ # When executing actions, some lookahead might have been used to make the
+ # parse table consistent. Replayed terms are popped before popping any
+ # elements from the stack, and they are added in reversed order in the
+ # replay list, such that they would be shifted after shifting the `reduced`
+ # non-terminal.
+ #
+ # This number might also be negative, in which case some lookahead terms
+ # are expected to exists in the replay list, and they are shifted back.
+ # This must happen only follow_edge is True.
+ replay: int
+
+ def reduce_stack(self) -> bool:
+ """Returns whether the action is reducing the stack by replacing popped
+ elements by a non-terminal. Note, this test is simpler than checking
+ for instances, as Reduce / Unwind might either be present, or present
+ as part of the last element of a Seq action. """
+ return self.nt is not None
+
+
+class Action:
+ __slots__ = ["_hash"]
+
+ # Cached hash.
+ _hash: typing.Optional[int]
+
+ def __init__(self) -> None:
+ self._hash = None
+
+ def is_inconsistent(self) -> bool:
+ """Returns True if this action is inconsistent. An action can be
+ inconsistent if the parameters it is given cannot be evaluated given
+ its current location in the parse table. Such as CheckNotOnNewLine.
+ """
+ return False
+
+ def is_condition(self) -> bool:
+ "Unordered condition, which accept or not to reach the next state."
+ return False
+
+ def condition(self) -> Action:
+ "Return the conditional action."
+ raise TypeError("Action.condition not implemented")
+
+ def check_same_variable(self, other: Action) -> bool:
+ "Return whether both conditionals are checking the same variable."
+ assert self.is_condition()
+ raise TypeError("Action.check_same_variable not implemented")
+
+ def check_different_values(self, other: Action) -> bool:
+ "Return whether these 2 conditions are mutually exclusive."
+ assert self.is_condition()
+ raise TypeError("Action.check_different_values not implemented")
+
+ def follow_edge(self) -> bool:
+ """Whether the execution of this action resume following the epsilon transition
+ (True) or if it breaks the graph epsilon transition (False) and returns
+ at a different location, defined by the top of the stack."""
+ return True
+
+ def update_stack(self) -> bool:
+ """Whether the execution of this action changes the parser stack."""
+ return False
+
+ def update_stack_with(self) -> StackDiff:
+ """Returns a StackDiff which represents the mutation to be applied to the
+ parser stack."""
+ assert self.update_stack()
+ raise TypeError("Action.update_stack_with not implemented")
+
+ def unshift_action(self, num: int) -> Action:
+ """When manipulating stack operation, we have the option to unshift some
+ replayed token which were shifted to disambiguate the grammar. However,
+ they might no longer be needed in some cases."""
+ raise TypeError("{} cannot be unshifted".format(self.__class__.__name__))
+
+ def shifted_action(self, shifted_term: Element) -> ShiftedAction:
+ """Transpose this action with shifting the given terminal or Nt.
+
+ That is, the sequence of:
+ - performing the action `self`, then
+ - shifting `shifted_term`
+ has the same effect as:
+ - shifting `shifted_term`, then
+ - performing the action `self.shifted_action(shifted_term)`.
+
+ If the resulting shifted action would be a no-op, instead return True.
+
+ If this is a conditional action and `shifted_term` indicates that the
+ condition wasn't met, return False.
+ """
+ return self
+
+ def contains_accept(self) -> bool:
+ "Returns whether the current action stops the parser."
+ return False
+
+ def rewrite_state_indexes(self, state_map: typing.Dict[StateId, StateId]) -> Action:
+ """If the action contains any state index, use the map to map the old index to
+ the new indexes"""
+ return self
+
+ def fold_by_destination(self, actions: typing.List[Action]) -> typing.List[Action]:
+ """If after rewriting state indexes, multiple condition are reaching the same
+ destination state, we attempt to fold them by destination. Not
+ implementing this function can lead to the introduction of inconsistent
+ states, as the conditions might be redundant. """
+
+ # By default do nothing.
+ return actions
+
+ def state_refs(self) -> typing.List[StateId]:
+ """List of states which are referenced by this action."""
+ # By default do nothing.
+ return []
+
+ def __eq__(self, other: object) -> bool:
+ if self.__class__ != other.__class__:
+ return False
+ assert isinstance(other, Action)
+ for s in self.__slots__:
+ if getattr(self, s) != getattr(other, s):
+ return False
+ return True
+
+ def __hash__(self) -> int:
+ if self._hash is not None:
+ return self._hash
+
+ def hashed_content() -> typing.Iterator[object]:
+ yield self.__class__
+ for s in self.__slots__:
+ yield repr(getattr(self, s))
+
+ self._hash = hash(tuple(hashed_content()))
+ return self._hash
+
+ def __lt__(self, other: Action) -> bool:
+ return hash(self) < hash(other)
+
+ def __repr__(self) -> str:
+ return str(self)
+
+ def stable_str(self, states: typing.Any) -> str:
+ return str(self)
+
+
+ShiftedAction = typing.Union[Action, bool]
+
+
+class Replay(Action):
+ """Replay a term which was previously saved by the Unwind function. Note that
+ this does not Shift a term given as argument as the replay action should
+ always be garanteed and that we want to maximize the sharing of code when
+ possible."""
+ __slots__ = ['replay_steps']
+
+ replay_steps: typing.Tuple[StateId, ...]
+
+ def __init__(self, replay_steps: typing.Iterable[StateId]):
+ super().__init__()
+ self.replay_steps = tuple(replay_steps)
+
+ def update_stack(self) -> bool:
+ return True
+
+ def update_stack_with(self) -> StackDiff:
+ return StackDiff(0, None, -len(self.replay_steps))
+
+ def rewrite_state_indexes(self, state_map: typing.Dict[StateId, StateId]) -> Replay:
+ return Replay(map(lambda s: state_map[s], self.replay_steps))
+
+ def state_refs(self) -> typing.List[StateId]:
+ return list(self.replay_steps)
+
+ def __str__(self) -> str:
+ return "Replay({})".format(str(self.replay_steps))
+
+
+class Unwind(Action):
+ """Define an unwind operation which pops N elements of the stack and pushes one
+ non-terminal. The replay argument of an unwind action corresponds to the
+ number of stack elements which would have to be popped and pushed again
+ using the parser table after executing this operation."""
+ __slots__ = ['nt', 'replay', 'pop']
+
+ nt: Nt
+ pop: int
+ replay: int
+
+ def __init__(self, nt: Nt, pop: int, replay: int = 0) -> None:
+ super().__init__()
+ self.nt = nt # Non-terminal which is reduced
+ self.pop = pop # Number of stack elements which should be replayed.
+ self.replay = replay # List of terms to shift back
+
+ def __str__(self) -> str:
+ return "Unwind({}, {}, {})".format(self.nt, self.pop, self.replay)
+
+ def update_stack(self) -> bool:
+ return True
+
+ def update_stack_with(self) -> StackDiff:
+ return StackDiff(self.pop, self.nt, self.replay)
+
+ def unshift_action(self, num: int) -> Unwind:
+ return Unwind(self.nt, self.pop, replay=self.replay - num)
+
+ def shifted_action(self, shifted_term: Element) -> Unwind:
+ return Unwind(self.nt, self.pop, replay=self.replay + 1)
+
+
+class Reduce(Action):
+ """Prevent the fall-through to the epsilon transition and returns to the shift
+ table execution to resume shifting or replaying terms."""
+ __slots__ = ['unwind']
+
+ unwind: Unwind
+
+ def __init__(self, unwind: Unwind) -> None:
+ nt_name = unwind.nt.name
+ if isinstance(nt_name, InitNt):
+ name = "Start_" + str(nt_name.goal.name)
+ else:
+ name = nt_name
+ super().__init__()
+ self.unwind = unwind
+
+ def __str__(self) -> str:
+ return "Reduce({})".format(str(self.unwind))
+
+ def follow_edge(self) -> bool:
+ return False
+
+ def update_stack(self) -> bool:
+ return self.unwind.update_stack()
+
+ def update_stack_with(self) -> StackDiff:
+ return self.unwind.update_stack_with()
+
+ def unshift_action(self, num: int) -> Reduce:
+ unwind = self.unwind.unshift_action(num)
+ return Reduce(unwind)
+
+ def shifted_action(self, shifted_term: Element) -> Reduce:
+ unwind = self.unwind.shifted_action(shifted_term)
+ return Reduce(unwind)
+
+
+class Accept(Action):
+ """This state terminate the parser by accepting the content consumed until
+ now."""
+ __slots__: typing.List[str] = []
+
+ def __init__(self) -> None:
+ super().__init__()
+
+ def __str__(self) -> str:
+ return "Accept()"
+
+ def contains_accept(self) -> bool:
+ "Returns whether the current action stops the parser."
+ return True
+
+ def shifted_action(self, shifted_term: Element) -> Accept:
+ return Accept()
+
+
+class Lookahead(Action):
+ """Define a Lookahead assertion which is meant to either accept or reject
+ sequences of terminal/non-terminals sequences."""
+ __slots__ = ['terms', 'accept']
+
+ terms: typing.FrozenSet[str]
+ accept: bool
+
+ def __init__(self, terms: typing.FrozenSet[str], accept: bool):
+ super().__init__()
+ self.terms = terms
+ self.accept = accept
+
+ def is_inconsistent(self) -> bool:
+ # A lookahead restriction cannot be encoded in code, it has to be
+ # solved using fix_with_lookahead, which encodes the lookahead
+ # resolution in the generated parse table.
+ return True
+
+ def is_condition(self) -> bool:
+ return True
+
+ def condition(self) -> Lookahead:
+ return self
+
+ def check_same_variable(self, other: Action) -> bool:
+ raise TypeError("Lookahead.check_same_variables: Lookahead are always inconsistent")
+
+ def check_different_values(self, other: Action) -> bool:
+ raise TypeError("Lookahead.check_different_values: Lookahead are always inconsistent")
+
+ def __str__(self) -> str:
+ return "Lookahead({}, {})".format(self.terms, self.accept)
+
+ def shifted_action(self, shifted_term: Element) -> ShiftedAction:
+ if isinstance(shifted_term, Nt):
+ return True
+ if shifted_term in self.terms:
+ return self.accept
+ return not self.accept
+
+
+class CheckNotOnNewLine(Action):
+ """Check whether the terminal at the given stack offset is on a new line or
+ not. If not this would produce an Error, otherwise this rule would be
+ shifted."""
+ __slots__ = ['offset']
+
+ offset: int
+
+ def __init__(self, offset: int = 0) -> None:
+ # assert offset >= -1 and "Smaller offsets are not supported on all backends."
+ super().__init__()
+ self.offset = offset
+
+ def is_inconsistent(self) -> bool:
+ # We can only look at stacked terminals. Having an offset of 0 implies
+ # that we are looking for the next terminal, which is not yet shifted.
+ # Therefore this action is inconsistent as long as the terminal is not
+ # on the stack.
+ return self.offset >= 0
+
+ def is_condition(self) -> bool:
+ return True
+
+ def condition(self) -> CheckNotOnNewLine:
+ return self
+
+ def check_same_variable(self, other: Action) -> bool:
+ return isinstance(other, CheckNotOnNewLine) and self.offset == other.offset
+
+ def check_different_values(self, other: Action) -> bool:
+ return False
+
+ def shifted_action(self, shifted_term: Element) -> ShiftedAction:
+ if isinstance(shifted_term, Nt):
+ return True
+ return CheckNotOnNewLine(self.offset - 1)
+
+ def __str__(self) -> str:
+ return "CheckNotOnNewLine({})".format(self.offset)
+
+
+class FilterStates(Action):
+ """Check whether the stack at a given depth match the state value, if so
+ transition to the destination, otherwise check other states."""
+ __slots__ = ['states']
+
+ states: OrderedFrozenSet[StateId]
+
+ def __init__(self, states: typing.Iterable[StateId]):
+ super().__init__()
+ # Set of states which can follow this transition.
+ self.states = OrderedFrozenSet(sorted(states))
+
+ def is_condition(self) -> bool:
+ return True
+
+ def condition(self) -> FilterStates:
+ return self
+
+ def check_same_variable(self, other: Action) -> bool:
+ return isinstance(other, FilterStates)
+
+ def check_different_values(self, other: Action) -> bool:
+ assert isinstance(other, FilterStates)
+ return self.states.is_disjoint(other.states)
+
+ def rewrite_state_indexes(self, state_map: typing.Dict[StateId, StateId]) -> FilterStates:
+ states = list(state_map[s] for s in self.states)
+ return FilterStates(states)
+
+ def fold_by_destination(self, actions: typing.List[Action]) -> typing.List[Action]:
+ states: typing.List[StateId] = []
+ for a in actions:
+ if not isinstance(a, FilterStates):
+ # Do nothing in case the state is inconsistent.
+ return actions
+ states.extend(a.states)
+ return [FilterStates(states)]
+
+ def state_refs(self) -> typing.List[StateId]:
+ return list(self.states)
+
+ def __str__(self) -> str:
+ return "FilterStates({})".format(self.states)
+
+
+class FilterFlag(Action):
+ """Define a filter which check for one value of the flag, and continue to the
+ next state if the top of the flag stack matches the expected value."""
+ __slots__ = ['flag', 'value']
+
+ flag: str
+ value: object
+
+ def __init__(self, flag: str, value: object) -> None:
+ super().__init__()
+ self.flag = flag
+ self.value = value
+
+ def is_condition(self) -> bool:
+ return True
+
+ def condition(self) -> FilterFlag:
+ return self
+
+ def check_same_variable(self, other: Action) -> bool:
+ return isinstance(other, FilterFlag) and self.flag == other.flag
+
+ def check_different_values(self, other: Action) -> bool:
+ assert isinstance(other, FilterFlag)
+ return self.value != other.value
+
+ def __str__(self) -> str:
+ return "FilterFlag({}, {})".format(self.flag, self.value)
+
+
+class PushFlag(Action):
+ """Define an action which pushes a value on a stack dedicated to the flag. This
+ other stack correspond to another parse stack which live next to the
+ default state machine and is popped by PopFlag, as-if this was another
+ reduce action. This is particularly useful to raise the parse table from a
+ LR(0) to an LR(k) without needing as much state duplications."""
+ __slots__ = ['flag', 'value']
+
+ flag: str
+ value: object
+
+ def __init__(self, flag: str, value: object) -> None:
+ super().__init__()
+ self.flag = flag
+ self.value = value
+
+ def __str__(self) -> str:
+ return "PushFlag({}, {})".format(self.flag, self.value)
+
+
+class PopFlag(Action):
+ """Define an action which pops a flag from the flag bit stack."""
+ __slots__ = ['flag']
+
+ flag: str
+
+ def __init__(self, flag: str) -> None:
+ super().__init__()
+ self.flag = flag
+
+ def __str__(self) -> str:
+ return "PopFlag({})".format(self.flag)
+
+
+# OutputExpr: An expression mini-language that compiles very directly to code
+# in the output language (Rust or Python). An OutputExpr is one of:
+#
+# str - an identifier in the generated code
+# int - an index into the runtime stack
+# None or Some(FunCallArg) - an optional value
+#
+OutputExpr = typing.Union[str, int, None, grammar.Some]
+
+
+class FunCall(Action):
+ """Define a call method operation which reads N elements of he stack and
+ pushpathne non-terminal. The replay attribute of a reduce action correspond
+ to the number of stack elements which would have to be popped and pushed
+ again using the parser table after reducing this operation. """
+ __slots__ = ['trait', 'method', 'offset', 'args', 'fallible', 'set_to']
+
+ trait: types.Type
+ method: str
+ offset: int
+ args: typing.Tuple[OutputExpr, ...]
+ fallible: bool
+ set_to: str
+
+ def __init__(
+ self,
+ method: str,
+ args: typing.Tuple[OutputExpr, ...],
+ trait: types.Type = types.Type("AstBuilder"),
+ fallible: bool = False,
+ set_to: str = "val",
+ offset: int = 0,
+ ) -> None:
+ super().__init__()
+ self.trait = trait # Trait on which this method is implemented.
+ self.method = method # Method and argument to be read for calling it.
+ self.fallible = fallible # Whether the function call can fail.
+ self.offset = offset # Offset to add to each argument offset.
+ self.args = args # Tuple of arguments offsets.
+ self.set_to = set_to # Temporary variable name to set with the result.
+
+ def __str__(self) -> str:
+ return "{} = {}::{}({}){} [off: {}]".format(
+ self.set_to, self.trait, self.method,
+ ", ".join(map(str, self.args)),
+ self.fallible and '?' or '',
+ self.offset)
+
+ def __repr__(self) -> str:
+ return "FunCall({})".format(', '.join(map(repr, [
+ self.trait, self.method, self.fallible,
+ self.args, self.set_to, self.offset
+ ])))
+
+ def unshift_action(self, num: int) -> FunCall:
+ return FunCall(self.method, self.args,
+ trait=self.trait,
+ fallible=self.fallible,
+ set_to=self.set_to,
+ offset=self.offset - num)
+
+ def shifted_action(self, shifted_term: Element) -> FunCall:
+ return FunCall(self.method,
+ self.args,
+ trait=self.trait,
+ fallible=self.fallible,
+ set_to=self.set_to,
+ offset=self.offset + 1)
+
+
+class Seq(Action):
+ """Aggregate multiple actions in one statement. Note, that the aggregated
+ actions should not contain any condition or action which are mutating the
+ state. Only the last action aggregated can update the parser stack"""
+ __slots__ = ['actions']
+
+ actions: typing.Tuple[Action, ...]
+
+ def __init__(self, actions: typing.Sequence[Action]) -> None:
+ super().__init__()
+ self.actions = tuple(actions) # Ordered list of actions to execute.
+ assert all([not a.is_condition() for a in actions])
+ assert all([not isinstance(a, Seq) for a in actions])
+ assert all([a.follow_edge() for a in actions[:-1]])
+ assert all([not a.update_stack() for a in actions[:-1]])
+
+ def __str__(self) -> str:
+ return "{{ {} }}".format("; ".join(map(str, self.actions)))
+
+ def __repr__(self) -> str:
+ return "Seq({})".format(repr(self.actions))
+
+ def follow_edge(self) -> bool:
+ return self.actions[-1].follow_edge()
+
+ def update_stack(self) -> bool:
+ return self.actions[-1].update_stack()
+
+ def update_stack_with(self) -> StackDiff:
+ return self.actions[-1].update_stack_with()
+
+ def unshift_action(self, num: int) -> Seq:
+ actions = list(map(lambda a: a.unshift_action(num), self.actions))
+ return Seq(actions)
+
+ def shifted_action(self, shift: Element) -> ShiftedAction:
+ actions: typing.List[Action] = []
+ for a in self.actions:
+ b = a.shifted_action(shift)
+ if isinstance(b, bool):
+ if b is False:
+ return False
+ else:
+ actions.append(b)
+ return Seq(actions)
+
+ def contains_accept(self) -> bool:
+ return any(a.contains_accept() for a in self.actions)
+
+ def rewrite_state_indexes(self, state_map: typing.Dict[StateId, StateId]) -> Seq:
+ actions = list(map(lambda a: a.rewrite_state_indexes(state_map), self.actions))
+ return Seq(actions)
+
+ def state_refs(self) -> typing.List[StateId]:
+ return [s for a in self.actions for s in a.state_refs()]