Edit on GitHub

sqlglot.planner

  1from __future__ import annotations
  2
  3import itertools
  4import math
  5import typing as t
  6
  7from sqlglot import alias, exp
  8from sqlglot.errors import UnsupportedError
  9from sqlglot.optimizer.eliminate_joins import join_condition
 10
 11
 12class Plan:
 13    def __init__(self, expression: exp.Expression) -> None:
 14        self.expression = expression.copy()
 15        self.root = Step.from_expression(self.expression)
 16        self._dag: t.Dict[Step, t.Set[Step]] = {}
 17
 18    @property
 19    def dag(self) -> t.Dict[Step, t.Set[Step]]:
 20        if not self._dag:
 21            dag: t.Dict[Step, t.Set[Step]] = {}
 22            nodes = {self.root}
 23
 24            while nodes:
 25                node = nodes.pop()
 26                dag[node] = set()
 27                for dep in node.dependencies:
 28                    dag[node].add(dep)
 29                    nodes.add(dep)
 30            self._dag = dag
 31
 32        return self._dag
 33
 34    @property
 35    def leaves(self) -> t.Iterator[Step]:
 36        return (node for node, deps in self.dag.items() if not deps)
 37
 38    def __repr__(self) -> str:
 39        return f"Plan\n----\n{repr(self.root)}"
 40
 41
 42class Step:
 43    @classmethod
 44    def from_expression(
 45        cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None
 46    ) -> Step:
 47        """
 48        Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine.
 49        Note: the expression's tables and subqueries must be aliased for this method to work. For
 50        example, given the following expression:
 51
 52        SELECT
 53          x.a,
 54          SUM(x.b)
 55        FROM x AS x
 56        JOIN y AS y
 57          ON x.a = y.a
 58        GROUP BY x.a
 59
 60        the following DAG is produced (the expression IDs might differ per execution):
 61
 62        - Aggregate: x (4347984624)
 63            Context:
 64              Aggregations:
 65                - SUM(x.b)
 66              Group:
 67                - x.a
 68            Projections:
 69              - x.a
 70              - "x".""
 71            Dependencies:
 72            - Join: x (4347985296)
 73              Context:
 74                y:
 75                On: x.a = y.a
 76              Projections:
 77              Dependencies:
 78              - Scan: x (4347983136)
 79                Context:
 80                  Source: x AS x
 81                Projections:
 82              - Scan: y (4343416624)
 83                Context:
 84                  Source: y AS y
 85                Projections:
 86
 87        Args:
 88            expression: the expression to build the DAG from.
 89            ctes: a dictionary that maps CTEs to their corresponding Step DAG by name.
 90
 91        Returns:
 92            A Step DAG corresponding to `expression`.
 93        """
 94        ctes = ctes or {}
 95        with_ = expression.args.get("with")
 96
 97        # CTEs break the mold of scope and introduce themselves to all in the context.
 98        if with_:
 99            ctes = ctes.copy()
100            for cte in with_.expressions:
101                step = Step.from_expression(cte.this, ctes)
102                step.name = cte.alias
103                ctes[step.name] = step  # type: ignore
104
105        from_ = expression.args.get("from")
106
107        if isinstance(expression, exp.Select) and from_:
108            from_ = from_.expressions
109            if len(from_) > 1:
110                raise UnsupportedError(
111                    "Multi-from statements are unsupported. Run it through the optimizer"
112                )
113
114            step = Scan.from_expression(from_[0], ctes)
115        elif isinstance(expression, exp.Union):
116            step = SetOperation.from_expression(expression, ctes)
117        else:
118            step = Scan()
119
120        joins = expression.args.get("joins")
121
122        if joins:
123            join = Join.from_joins(joins, ctes)
124            join.name = step.name
125            join.add_dependency(step)
126            step = join
127
128        projections = []  # final selects in this chain of steps representing a select
129        operands = {}  # intermediate computations of agg funcs eg x + 1 in SUM(x + 1)
130        aggregations = []
131        sequence = itertools.count()
132
133        def extract_agg_operands(expression):
134            for agg in expression.find_all(exp.AggFunc):
135                for operand in agg.unnest_operands():
136                    if isinstance(operand, exp.Column):
137                        continue
138                    if operand not in operands:
139                        operands[operand] = f"_a_{next(sequence)}"
140                    operand.replace(exp.column(operands[operand], quoted=True))
141
142        for e in expression.expressions:
143            if e.find(exp.AggFunc):
144                projections.append(exp.column(e.alias_or_name, step.name, quoted=True))
145                aggregations.append(e)
146                extract_agg_operands(e)
147            else:
148                projections.append(e)
149
150        where = expression.args.get("where")
151
152        if where:
153            step.condition = where.this
154
155        group = expression.args.get("group")
156
157        if group or aggregations:
158            aggregate = Aggregate()
159            aggregate.source = step.name
160            aggregate.name = step.name
161
162            having = expression.args.get("having")
163
164            if having:
165                extract_agg_operands(having)
166                aggregate.condition = having.this
167
168            aggregate.operands = tuple(
169                alias(operand, alias_) for operand, alias_ in operands.items()
170            )
171            aggregate.aggregations = aggregations
172            # give aggregates names and replace projections with references to them
173            aggregate.group = {
174                f"_g{i}": e for i, e in enumerate(group.expressions if group else [])
175            }
176            for projection in projections:
177                for i, e in aggregate.group.items():
178                    for child, *_ in projection.walk():
179                        if child == e:
180                            child.replace(exp.column(i, step.name))
181            aggregate.add_dependency(step)
182            step = aggregate
183
184        order = expression.args.get("order")
185
186        if order:
187            sort = Sort()
188            sort.name = step.name
189            sort.key = order.expressions
190            sort.add_dependency(step)
191            step = sort
192
193        step.projections = projections
194
195        if isinstance(expression, exp.Select) and expression.args.get("distinct"):
196            distinct = Aggregate()
197            distinct.source = step.name
198            distinct.name = step.name
199            distinct.group = {
200                e.alias_or_name: exp.column(col=e.alias_or_name, table=step.name)
201                for e in projections or expression.expressions
202            }
203            distinct.add_dependency(step)
204            step = distinct
205
206        limit = expression.args.get("limit")
207
208        if limit:
209            step.limit = int(limit.text("expression"))
210
211        return step
212
213    def __init__(self) -> None:
214        self.name: t.Optional[str] = None
215        self.dependencies: t.Set[Step] = set()
216        self.dependents: t.Set[Step] = set()
217        self.projections: t.Sequence[exp.Expression] = []
218        self.limit: float = math.inf
219        self.condition: t.Optional[exp.Expression] = None
220
221    def add_dependency(self, dependency: Step) -> None:
222        self.dependencies.add(dependency)
223        dependency.dependents.add(self)
224
225    def __repr__(self) -> str:
226        return self.to_s()
227
228    def to_s(self, level: int = 0) -> str:
229        indent = "  " * level
230        nested = f"{indent}    "
231
232        context = self._to_s(f"{nested}  ")
233
234        if context:
235            context = [f"{nested}Context:"] + context
236
237        lines = [
238            f"{indent}- {self.id}",
239            *context,
240            f"{nested}Projections:",
241        ]
242
243        for expression in self.projections:
244            lines.append(f"{nested}  - {expression.sql()}")
245
246        if self.condition:
247            lines.append(f"{nested}Condition: {self.condition.sql()}")
248
249        if self.limit is not math.inf:
250            lines.append(f"{nested}Limit: {self.limit}")
251
252        if self.dependencies:
253            lines.append(f"{nested}Dependencies:")
254            for dependency in self.dependencies:
255                lines.append("  " + dependency.to_s(level + 1))
256
257        return "\n".join(lines)
258
259    @property
260    def type_name(self) -> str:
261        return self.__class__.__name__
262
263    @property
264    def id(self) -> str:
265        name = self.name
266        name = f" {name}" if name else ""
267        return f"{self.type_name}:{name} ({id(self)})"
268
269    def _to_s(self, _indent: str) -> t.List[str]:
270        return []
271
272
273class Scan(Step):
274    @classmethod
275    def from_expression(
276        cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None
277    ) -> Step:
278        table = expression
279        alias_ = expression.alias_or_name
280
281        if isinstance(expression, exp.Subquery):
282            table = expression.this
283            step = Step.from_expression(table, ctes)
284            step.name = alias_
285            return step
286
287        step = Scan()
288        step.name = alias_
289        step.source = expression
290        if ctes and table.name in ctes:
291            step.add_dependency(ctes[table.name])
292
293        return step
294
295    def __init__(self) -> None:
296        super().__init__()
297        self.source: t.Optional[exp.Expression] = None
298
299    def _to_s(self, indent: str) -> t.List[str]:
300        return [f"{indent}Source: {self.source.sql() if self.source else '-static-'}"]  # type: ignore
301
302
303class Join(Step):
304    @classmethod
305    def from_joins(
306        cls, joins: t.Iterable[exp.Join], ctes: t.Optional[t.Dict[str, Step]] = None
307    ) -> Step:
308        step = Join()
309
310        for join in joins:
311            source_key, join_key, condition = join_condition(join)
312            step.joins[join.this.alias_or_name] = {
313                "side": join.side,
314                "join_key": join_key,
315                "source_key": source_key,
316                "condition": condition,
317            }
318
319            step.add_dependency(Scan.from_expression(join.this, ctes))
320
321        return step
322
323    def __init__(self) -> None:
324        super().__init__()
325        self.joins: t.Dict[str, t.Dict[str, t.List[str] | exp.Expression]] = {}
326
327    def _to_s(self, indent: str) -> t.List[str]:
328        lines = []
329        for name, join in self.joins.items():
330            lines.append(f"{indent}{name}: {join['side']}")
331            if join.get("condition"):
332                lines.append(f"{indent}On: {join['condition'].sql()}")  # type: ignore
333        return lines
334
335
336class Aggregate(Step):
337    def __init__(self) -> None:
338        super().__init__()
339        self.aggregations: t.List[exp.Expression] = []
340        self.operands: t.Tuple[exp.Expression, ...] = ()
341        self.group: t.Dict[str, exp.Expression] = {}
342        self.source: t.Optional[str] = None
343
344    def _to_s(self, indent: str) -> t.List[str]:
345        lines = [f"{indent}Aggregations:"]
346
347        for expression in self.aggregations:
348            lines.append(f"{indent}  - {expression.sql()}")
349
350        if self.group:
351            lines.append(f"{indent}Group:")
352            for expression in self.group.values():
353                lines.append(f"{indent}  - {expression.sql()}")
354        if self.condition:
355            lines.append(f"{indent}Having:")
356            lines.append(f"{indent}  - {self.condition.sql()}")
357        if self.operands:
358            lines.append(f"{indent}Operands:")
359            for expression in self.operands:
360                lines.append(f"{indent}  - {expression.sql()}")
361
362        return lines
363
364
365class Sort(Step):
366    def __init__(self) -> None:
367        super().__init__()
368        self.key = None
369
370    def _to_s(self, indent: str) -> t.List[str]:
371        lines = [f"{indent}Key:"]
372
373        for expression in self.key:  # type: ignore
374            lines.append(f"{indent}  - {expression.sql()}")
375
376        return lines
377
378
379class SetOperation(Step):
380    def __init__(
381        self,
382        op: t.Type[exp.Expression],
383        left: str | None,
384        right: str | None,
385        distinct: bool = False,
386    ) -> None:
387        super().__init__()
388        self.op = op
389        self.left = left
390        self.right = right
391        self.distinct = distinct
392
393    @classmethod
394    def from_expression(
395        cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None
396    ) -> Step:
397        assert isinstance(expression, exp.Union)
398        left = Step.from_expression(expression.left, ctes)
399        right = Step.from_expression(expression.right, ctes)
400        step = cls(
401            op=expression.__class__,
402            left=left.name,
403            right=right.name,
404            distinct=bool(expression.args.get("distinct")),
405        )
406        step.add_dependency(left)
407        step.add_dependency(right)
408        return step
409
410    def _to_s(self, indent: str) -> t.List[str]:
411        lines = []
412        if self.distinct:
413            lines.append(f"{indent}Distinct: {self.distinct}")
414        return lines
415
416    @property
417    def type_name(self) -> str:
418        return self.op.__name__
class Plan:
13class Plan:
14    def __init__(self, expression: exp.Expression) -> None:
15        self.expression = expression.copy()
16        self.root = Step.from_expression(self.expression)
17        self._dag: t.Dict[Step, t.Set[Step]] = {}
18
19    @property
20    def dag(self) -> t.Dict[Step, t.Set[Step]]:
21        if not self._dag:
22            dag: t.Dict[Step, t.Set[Step]] = {}
23            nodes = {self.root}
24
25            while nodes:
26                node = nodes.pop()
27                dag[node] = set()
28                for dep in node.dependencies:
29                    dag[node].add(dep)
30                    nodes.add(dep)
31            self._dag = dag
32
33        return self._dag
34
35    @property
36    def leaves(self) -> t.Iterator[Step]:
37        return (node for node, deps in self.dag.items() if not deps)
38
39    def __repr__(self) -> str:
40        return f"Plan\n----\n{repr(self.root)}"
Plan(expression: sqlglot.expressions.Expression)
14    def __init__(self, expression: exp.Expression) -> None:
15        self.expression = expression.copy()
16        self.root = Step.from_expression(self.expression)
17        self._dag: t.Dict[Step, t.Set[Step]] = {}
class Step:
 43class Step:
 44    @classmethod
 45    def from_expression(
 46        cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None
 47    ) -> Step:
 48        """
 49        Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine.
 50        Note: the expression's tables and subqueries must be aliased for this method to work. For
 51        example, given the following expression:
 52
 53        SELECT
 54          x.a,
 55          SUM(x.b)
 56        FROM x AS x
 57        JOIN y AS y
 58          ON x.a = y.a
 59        GROUP BY x.a
 60
 61        the following DAG is produced (the expression IDs might differ per execution):
 62
 63        - Aggregate: x (4347984624)
 64            Context:
 65              Aggregations:
 66                - SUM(x.b)
 67              Group:
 68                - x.a
 69            Projections:
 70              - x.a
 71              - "x".""
 72            Dependencies:
 73            - Join: x (4347985296)
 74              Context:
 75                y:
 76                On: x.a = y.a
 77              Projections:
 78              Dependencies:
 79              - Scan: x (4347983136)
 80                Context:
 81                  Source: x AS x
 82                Projections:
 83              - Scan: y (4343416624)
 84                Context:
 85                  Source: y AS y
 86                Projections:
 87
 88        Args:
 89            expression: the expression to build the DAG from.
 90            ctes: a dictionary that maps CTEs to their corresponding Step DAG by name.
 91
 92        Returns:
 93            A Step DAG corresponding to `expression`.
 94        """
 95        ctes = ctes or {}
 96        with_ = expression.args.get("with")
 97
 98        # CTEs break the mold of scope and introduce themselves to all in the context.
 99        if with_:
100            ctes = ctes.copy()
101            for cte in with_.expressions:
102                step = Step.from_expression(cte.this, ctes)
103                step.name = cte.alias
104                ctes[step.name] = step  # type: ignore
105
106        from_ = expression.args.get("from")
107
108        if isinstance(expression, exp.Select) and from_:
109            from_ = from_.expressions
110            if len(from_) > 1:
111                raise UnsupportedError(
112                    "Multi-from statements are unsupported. Run it through the optimizer"
113                )
114
115            step = Scan.from_expression(from_[0], ctes)
116        elif isinstance(expression, exp.Union):
117            step = SetOperation.from_expression(expression, ctes)
118        else:
119            step = Scan()
120
121        joins = expression.args.get("joins")
122
123        if joins:
124            join = Join.from_joins(joins, ctes)
125            join.name = step.name
126            join.add_dependency(step)
127            step = join
128
129        projections = []  # final selects in this chain of steps representing a select
130        operands = {}  # intermediate computations of agg funcs eg x + 1 in SUM(x + 1)
131        aggregations = []
132        sequence = itertools.count()
133
134        def extract_agg_operands(expression):
135            for agg in expression.find_all(exp.AggFunc):
136                for operand in agg.unnest_operands():
137                    if isinstance(operand, exp.Column):
138                        continue
139                    if operand not in operands:
140                        operands[operand] = f"_a_{next(sequence)}"
141                    operand.replace(exp.column(operands[operand], quoted=True))
142
143        for e in expression.expressions:
144            if e.find(exp.AggFunc):
145                projections.append(exp.column(e.alias_or_name, step.name, quoted=True))
146                aggregations.append(e)
147                extract_agg_operands(e)
148            else:
149                projections.append(e)
150
151        where = expression.args.get("where")
152
153        if where:
154            step.condition = where.this
155
156        group = expression.args.get("group")
157
158        if group or aggregations:
159            aggregate = Aggregate()
160            aggregate.source = step.name
161            aggregate.name = step.name
162
163            having = expression.args.get("having")
164
165            if having:
166                extract_agg_operands(having)
167                aggregate.condition = having.this
168
169            aggregate.operands = tuple(
170                alias(operand, alias_) for operand, alias_ in operands.items()
171            )
172            aggregate.aggregations = aggregations
173            # give aggregates names and replace projections with references to them
174            aggregate.group = {
175                f"_g{i}": e for i, e in enumerate(group.expressions if group else [])
176            }
177            for projection in projections:
178                for i, e in aggregate.group.items():
179                    for child, *_ in projection.walk():
180                        if child == e:
181                            child.replace(exp.column(i, step.name))
182            aggregate.add_dependency(step)
183            step = aggregate
184
185        order = expression.args.get("order")
186
187        if order:
188            sort = Sort()
189            sort.name = step.name
190            sort.key = order.expressions
191            sort.add_dependency(step)
192            step = sort
193
194        step.projections = projections
195
196        if isinstance(expression, exp.Select) and expression.args.get("distinct"):
197            distinct = Aggregate()
198            distinct.source = step.name
199            distinct.name = step.name
200            distinct.group = {
201                e.alias_or_name: exp.column(col=e.alias_or_name, table=step.name)
202                for e in projections or expression.expressions
203            }
204            distinct.add_dependency(step)
205            step = distinct
206
207        limit = expression.args.get("limit")
208
209        if limit:
210            step.limit = int(limit.text("expression"))
211
212        return step
213
214    def __init__(self) -> None:
215        self.name: t.Optional[str] = None
216        self.dependencies: t.Set[Step] = set()
217        self.dependents: t.Set[Step] = set()
218        self.projections: t.Sequence[exp.Expression] = []
219        self.limit: float = math.inf
220        self.condition: t.Optional[exp.Expression] = None
221
222    def add_dependency(self, dependency: Step) -> None:
223        self.dependencies.add(dependency)
224        dependency.dependents.add(self)
225
226    def __repr__(self) -> str:
227        return self.to_s()
228
229    def to_s(self, level: int = 0) -> str:
230        indent = "  " * level
231        nested = f"{indent}    "
232
233        context = self._to_s(f"{nested}  ")
234
235        if context:
236            context = [f"{nested}Context:"] + context
237
238        lines = [
239            f"{indent}- {self.id}",
240            *context,
241            f"{nested}Projections:",
242        ]
243
244        for expression in self.projections:
245            lines.append(f"{nested}  - {expression.sql()}")
246
247        if self.condition:
248            lines.append(f"{nested}Condition: {self.condition.sql()}")
249
250        if self.limit is not math.inf:
251            lines.append(f"{nested}Limit: {self.limit}")
252
253        if self.dependencies:
254            lines.append(f"{nested}Dependencies:")
255            for dependency in self.dependencies:
256                lines.append("  " + dependency.to_s(level + 1))
257
258        return "\n".join(lines)
259
260    @property
261    def type_name(self) -> str:
262        return self.__class__.__name__
263
264    @property
265    def id(self) -> str:
266        name = self.name
267        name = f" {name}" if name else ""
268        return f"{self.type_name}:{name} ({id(self)})"
269
270    def _to_s(self, _indent: str) -> t.List[str]:
271        return []
@classmethod
def from_expression( cls, expression: sqlglot.expressions.Expression, ctes: Optional[Dict[str, sqlglot.planner.Step]] = None) -> sqlglot.planner.Step:
 44    @classmethod
 45    def from_expression(
 46        cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None
 47    ) -> Step:
 48        """
 49        Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine.
 50        Note: the expression's tables and subqueries must be aliased for this method to work. For
 51        example, given the following expression:
 52
 53        SELECT
 54          x.a,
 55          SUM(x.b)
 56        FROM x AS x
 57        JOIN y AS y
 58          ON x.a = y.a
 59        GROUP BY x.a
 60
 61        the following DAG is produced (the expression IDs might differ per execution):
 62
 63        - Aggregate: x (4347984624)
 64            Context:
 65              Aggregations:
 66                - SUM(x.b)
 67              Group:
 68                - x.a
 69            Projections:
 70              - x.a
 71              - "x".""
 72            Dependencies:
 73            - Join: x (4347985296)
 74              Context:
 75                y:
 76                On: x.a = y.a
 77              Projections:
 78              Dependencies:
 79              - Scan: x (4347983136)
 80                Context:
 81                  Source: x AS x
 82                Projections:
 83              - Scan: y (4343416624)
 84                Context:
 85                  Source: y AS y
 86                Projections:
 87
 88        Args:
 89            expression: the expression to build the DAG from.
 90            ctes: a dictionary that maps CTEs to their corresponding Step DAG by name.
 91
 92        Returns:
 93            A Step DAG corresponding to `expression`.
 94        """
 95        ctes = ctes or {}
 96        with_ = expression.args.get("with")
 97
 98        # CTEs break the mold of scope and introduce themselves to all in the context.
 99        if with_:
100            ctes = ctes.copy()
101            for cte in with_.expressions:
102                step = Step.from_expression(cte.this, ctes)
103                step.name = cte.alias
104                ctes[step.name] = step  # type: ignore
105
106        from_ = expression.args.get("from")
107
108        if isinstance(expression, exp.Select) and from_:
109            from_ = from_.expressions
110            if len(from_) > 1:
111                raise UnsupportedError(
112                    "Multi-from statements are unsupported. Run it through the optimizer"
113                )
114
115            step = Scan.from_expression(from_[0], ctes)
116        elif isinstance(expression, exp.Union):
117            step = SetOperation.from_expression(expression, ctes)
118        else:
119            step = Scan()
120
121        joins = expression.args.get("joins")
122
123        if joins:
124            join = Join.from_joins(joins, ctes)
125            join.name = step.name
126            join.add_dependency(step)
127            step = join
128
129        projections = []  # final selects in this chain of steps representing a select
130        operands = {}  # intermediate computations of agg funcs eg x + 1 in SUM(x + 1)
131        aggregations = []
132        sequence = itertools.count()
133
134        def extract_agg_operands(expression):
135            for agg in expression.find_all(exp.AggFunc):
136                for operand in agg.unnest_operands():
137                    if isinstance(operand, exp.Column):
138                        continue
139                    if operand not in operands:
140                        operands[operand] = f"_a_{next(sequence)}"
141                    operand.replace(exp.column(operands[operand], quoted=True))
142
143        for e in expression.expressions:
144            if e.find(exp.AggFunc):
145                projections.append(exp.column(e.alias_or_name, step.name, quoted=True))
146                aggregations.append(e)
147                extract_agg_operands(e)
148            else:
149                projections.append(e)
150
151        where = expression.args.get("where")
152
153        if where:
154            step.condition = where.this
155
156        group = expression.args.get("group")
157
158        if group or aggregations:
159            aggregate = Aggregate()
160            aggregate.source = step.name
161            aggregate.name = step.name
162
163            having = expression.args.get("having")
164
165            if having:
166                extract_agg_operands(having)
167                aggregate.condition = having.this
168
169            aggregate.operands = tuple(
170                alias(operand, alias_) for operand, alias_ in operands.items()
171            )
172            aggregate.aggregations = aggregations
173            # give aggregates names and replace projections with references to them
174            aggregate.group = {
175                f"_g{i}": e for i, e in enumerate(group.expressions if group else [])
176            }
177            for projection in projections:
178                for i, e in aggregate.group.items():
179                    for child, *_ in projection.walk():
180                        if child == e:
181                            child.replace(exp.column(i, step.name))
182            aggregate.add_dependency(step)
183            step = aggregate
184
185        order = expression.args.get("order")
186
187        if order:
188            sort = Sort()
189            sort.name = step.name
190            sort.key = order.expressions
191            sort.add_dependency(step)
192            step = sort
193
194        step.projections = projections
195
196        if isinstance(expression, exp.Select) and expression.args.get("distinct"):
197            distinct = Aggregate()
198            distinct.source = step.name
199            distinct.name = step.name
200            distinct.group = {
201                e.alias_or_name: exp.column(col=e.alias_or_name, table=step.name)
202                for e in projections or expression.expressions
203            }
204            distinct.add_dependency(step)
205            step = distinct
206
207        limit = expression.args.get("limit")
208
209        if limit:
210            step.limit = int(limit.text("expression"))
211
212        return step

Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. Note: the expression's tables and subqueries must be aliased for this method to work. For example, given the following expression:

SELECT x.a, SUM(x.b) FROM x AS x JOIN y AS y ON x.a = y.a GROUP BY x.a

the following DAG is produced (the expression IDs might differ per execution):

  • Aggregate: x (4347984624) Context: Aggregations: - SUM(x.b) Group: - x.a Projections:
    • x.a
    • "x"."" Dependencies:
      • Join: x (4347985296) Context: y: On: x.a = y.a Projections: Dependencies:
    • Scan: x (4347983136) Context: Source: x AS x Projections:
    • Scan: y (4343416624) Context: Source: y AS y Projections:
Arguments:
  • expression: the expression to build the DAG from.
  • ctes: a dictionary that maps CTEs to their corresponding Step DAG by name.
Returns:

A Step DAG corresponding to expression.

def add_dependency(self, dependency: sqlglot.planner.Step) -> None:
222    def add_dependency(self, dependency: Step) -> None:
223        self.dependencies.add(dependency)
224        dependency.dependents.add(self)
def to_s(self, level: int = 0) -> str:
229    def to_s(self, level: int = 0) -> str:
230        indent = "  " * level
231        nested = f"{indent}    "
232
233        context = self._to_s(f"{nested}  ")
234
235        if context:
236            context = [f"{nested}Context:"] + context
237
238        lines = [
239            f"{indent}- {self.id}",
240            *context,
241            f"{nested}Projections:",
242        ]
243
244        for expression in self.projections:
245            lines.append(f"{nested}  - {expression.sql()}")
246
247        if self.condition:
248            lines.append(f"{nested}Condition: {self.condition.sql()}")
249
250        if self.limit is not math.inf:
251            lines.append(f"{nested}Limit: {self.limit}")
252
253        if self.dependencies:
254            lines.append(f"{nested}Dependencies:")
255            for dependency in self.dependencies:
256                lines.append("  " + dependency.to_s(level + 1))
257
258        return "\n".join(lines)
class Scan(Step):
274class Scan(Step):
275    @classmethod
276    def from_expression(
277        cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None
278    ) -> Step:
279        table = expression
280        alias_ = expression.alias_or_name
281
282        if isinstance(expression, exp.Subquery):
283            table = expression.this
284            step = Step.from_expression(table, ctes)
285            step.name = alias_
286            return step
287
288        step = Scan()
289        step.name = alias_
290        step.source = expression
291        if ctes and table.name in ctes:
292            step.add_dependency(ctes[table.name])
293
294        return step
295
296    def __init__(self) -> None:
297        super().__init__()
298        self.source: t.Optional[exp.Expression] = None
299
300    def _to_s(self, indent: str) -> t.List[str]:
301        return [f"{indent}Source: {self.source.sql() if self.source else '-static-'}"]  # type: ignore
@classmethod
def from_expression( cls, expression: sqlglot.expressions.Expression, ctes: Optional[Dict[str, sqlglot.planner.Step]] = None) -> sqlglot.planner.Step:
275    @classmethod
276    def from_expression(
277        cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None
278    ) -> Step:
279        table = expression
280        alias_ = expression.alias_or_name
281
282        if isinstance(expression, exp.Subquery):
283            table = expression.this
284            step = Step.from_expression(table, ctes)
285            step.name = alias_
286            return step
287
288        step = Scan()
289        step.name = alias_
290        step.source = expression
291        if ctes and table.name in ctes:
292            step.add_dependency(ctes[table.name])
293
294        return step

Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. Note: the expression's tables and subqueries must be aliased for this method to work. For example, given the following expression:

SELECT x.a, SUM(x.b) FROM x AS x JOIN y AS y ON x.a = y.a GROUP BY x.a

the following DAG is produced (the expression IDs might differ per execution):

  • Aggregate: x (4347984624) Context: Aggregations: - SUM(x.b) Group: - x.a Projections:
    • x.a
    • "x"."" Dependencies:
      • Join: x (4347985296) Context: y: On: x.a = y.a Projections: Dependencies:
    • Scan: x (4347983136) Context: Source: x AS x Projections:
    • Scan: y (4343416624) Context: Source: y AS y Projections:
Arguments:
  • expression: the expression to build the DAG from.
  • ctes: a dictionary that maps CTEs to their corresponding Step DAG by name.
Returns:

A Step DAG corresponding to expression.

Inherited Members
Step
add_dependency
to_s
class Join(Step):
304class Join(Step):
305    @classmethod
306    def from_joins(
307        cls, joins: t.Iterable[exp.Join], ctes: t.Optional[t.Dict[str, Step]] = None
308    ) -> Step:
309        step = Join()
310
311        for join in joins:
312            source_key, join_key, condition = join_condition(join)
313            step.joins[join.this.alias_or_name] = {
314                "side": join.side,
315                "join_key": join_key,
316                "source_key": source_key,
317                "condition": condition,
318            }
319
320            step.add_dependency(Scan.from_expression(join.this, ctes))
321
322        return step
323
324    def __init__(self) -> None:
325        super().__init__()
326        self.joins: t.Dict[str, t.Dict[str, t.List[str] | exp.Expression]] = {}
327
328    def _to_s(self, indent: str) -> t.List[str]:
329        lines = []
330        for name, join in self.joins.items():
331            lines.append(f"{indent}{name}: {join['side']}")
332            if join.get("condition"):
333                lines.append(f"{indent}On: {join['condition'].sql()}")  # type: ignore
334        return lines
@classmethod
def from_joins( cls, joins: Iterable[sqlglot.expressions.Join], ctes: Optional[Dict[str, sqlglot.planner.Step]] = None) -> sqlglot.planner.Step:
305    @classmethod
306    def from_joins(
307        cls, joins: t.Iterable[exp.Join], ctes: t.Optional[t.Dict[str, Step]] = None
308    ) -> Step:
309        step = Join()
310
311        for join in joins:
312            source_key, join_key, condition = join_condition(join)
313            step.joins[join.this.alias_or_name] = {
314                "side": join.side,
315                "join_key": join_key,
316                "source_key": source_key,
317                "condition": condition,
318            }
319
320            step.add_dependency(Scan.from_expression(join.this, ctes))
321
322        return step
class Aggregate(Step):
337class Aggregate(Step):
338    def __init__(self) -> None:
339        super().__init__()
340        self.aggregations: t.List[exp.Expression] = []
341        self.operands: t.Tuple[exp.Expression, ...] = ()
342        self.group: t.Dict[str, exp.Expression] = {}
343        self.source: t.Optional[str] = None
344
345    def _to_s(self, indent: str) -> t.List[str]:
346        lines = [f"{indent}Aggregations:"]
347
348        for expression in self.aggregations:
349            lines.append(f"{indent}  - {expression.sql()}")
350
351        if self.group:
352            lines.append(f"{indent}Group:")
353            for expression in self.group.values():
354                lines.append(f"{indent}  - {expression.sql()}")
355        if self.condition:
356            lines.append(f"{indent}Having:")
357            lines.append(f"{indent}  - {self.condition.sql()}")
358        if self.operands:
359            lines.append(f"{indent}Operands:")
360            for expression in self.operands:
361                lines.append(f"{indent}  - {expression.sql()}")
362
363        return lines
class Sort(Step):
366class Sort(Step):
367    def __init__(self) -> None:
368        super().__init__()
369        self.key = None
370
371    def _to_s(self, indent: str) -> t.List[str]:
372        lines = [f"{indent}Key:"]
373
374        for expression in self.key:  # type: ignore
375            lines.append(f"{indent}  - {expression.sql()}")
376
377        return lines
class SetOperation(Step):
380class SetOperation(Step):
381    def __init__(
382        self,
383        op: t.Type[exp.Expression],
384        left: str | None,
385        right: str | None,
386        distinct: bool = False,
387    ) -> None:
388        super().__init__()
389        self.op = op
390        self.left = left
391        self.right = right
392        self.distinct = distinct
393
394    @classmethod
395    def from_expression(
396        cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None
397    ) -> Step:
398        assert isinstance(expression, exp.Union)
399        left = Step.from_expression(expression.left, ctes)
400        right = Step.from_expression(expression.right, ctes)
401        step = cls(
402            op=expression.__class__,
403            left=left.name,
404            right=right.name,
405            distinct=bool(expression.args.get("distinct")),
406        )
407        step.add_dependency(left)
408        step.add_dependency(right)
409        return step
410
411    def _to_s(self, indent: str) -> t.List[str]:
412        lines = []
413        if self.distinct:
414            lines.append(f"{indent}Distinct: {self.distinct}")
415        return lines
416
417    @property
418    def type_name(self) -> str:
419        return self.op.__name__
SetOperation( op: Type[sqlglot.expressions.Expression], left: str | None, right: str | None, distinct: bool = False)
381    def __init__(
382        self,
383        op: t.Type[exp.Expression],
384        left: str | None,
385        right: str | None,
386        distinct: bool = False,
387    ) -> None:
388        super().__init__()
389        self.op = op
390        self.left = left
391        self.right = right
392        self.distinct = distinct
@classmethod
def from_expression( cls, expression: sqlglot.expressions.Expression, ctes: Optional[Dict[str, sqlglot.planner.Step]] = None) -> sqlglot.planner.Step:
394    @classmethod
395    def from_expression(
396        cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None
397    ) -> Step:
398        assert isinstance(expression, exp.Union)
399        left = Step.from_expression(expression.left, ctes)
400        right = Step.from_expression(expression.right, ctes)
401        step = cls(
402            op=expression.__class__,
403            left=left.name,
404            right=right.name,
405            distinct=bool(expression.args.get("distinct")),
406        )
407        step.add_dependency(left)
408        step.add_dependency(right)
409        return step

Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. Note: the expression's tables and subqueries must be aliased for this method to work. For example, given the following expression:

SELECT x.a, SUM(x.b) FROM x AS x JOIN y AS y ON x.a = y.a GROUP BY x.a

the following DAG is produced (the expression IDs might differ per execution):

  • Aggregate: x (4347984624) Context: Aggregations: - SUM(x.b) Group: - x.a Projections:
    • x.a
    • "x"."" Dependencies:
      • Join: x (4347985296) Context: y: On: x.a = y.a Projections: Dependencies:
    • Scan: x (4347983136) Context: Source: x AS x Projections:
    • Scan: y (4343416624) Context: Source: y AS y Projections:
Arguments:
  • expression: the expression to build the DAG from.
  • ctes: a dictionary that maps CTEs to their corresponding Step DAG by name.
Returns:

A Step DAG corresponding to expression.

Inherited Members
Step
add_dependency
to_s