sqlglot.planner
1from __future__ import annotations 2 3import itertools 4import math 5import typing as t 6 7from sqlglot import alias, exp 8from sqlglot.optimizer.eliminate_joins import join_condition 9 10 11class Plan: 12 def __init__(self, expression: exp.Expression) -> None: 13 self.expression = expression.copy() 14 self.root = Step.from_expression(self.expression) 15 self._dag: t.Dict[Step, t.Set[Step]] = {} 16 17 @property 18 def dag(self) -> t.Dict[Step, t.Set[Step]]: 19 if not self._dag: 20 dag: t.Dict[Step, t.Set[Step]] = {} 21 nodes = {self.root} 22 23 while nodes: 24 node = nodes.pop() 25 dag[node] = set() 26 for dep in node.dependencies: 27 dag[node].add(dep) 28 nodes.add(dep) 29 self._dag = dag 30 31 return self._dag 32 33 @property 34 def leaves(self) -> t.Iterator[Step]: 35 return (node for node, deps in self.dag.items() if not deps) 36 37 def __repr__(self) -> str: 38 return f"Plan\n----\n{repr(self.root)}" 39 40 41class Step: 42 @classmethod 43 def from_expression( 44 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 45 ) -> Step: 46 """ 47 Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. 48 Note: the expression's tables and subqueries must be aliased for this method to work. For 49 example, given the following expression: 50 51 SELECT 52 x.a, 53 SUM(x.b) 54 FROM x AS x 55 JOIN y AS y 56 ON x.a = y.a 57 GROUP BY x.a 58 59 the following DAG is produced (the expression IDs might differ per execution): 60 61 - Aggregate: x (4347984624) 62 Context: 63 Aggregations: 64 - SUM(x.b) 65 Group: 66 - x.a 67 Projections: 68 - x.a 69 - "x"."" 70 Dependencies: 71 - Join: x (4347985296) 72 Context: 73 y: 74 On: x.a = y.a 75 Projections: 76 Dependencies: 77 - Scan: x (4347983136) 78 Context: 79 Source: x AS x 80 Projections: 81 - Scan: y (4343416624) 82 Context: 83 Source: y AS y 84 Projections: 85 86 Args: 87 expression: the expression to build the DAG from. 88 ctes: a dictionary that maps CTEs to their corresponding Step DAG by name. 89 90 Returns: 91 A Step DAG corresponding to `expression`. 92 """ 93 ctes = ctes or {} 94 with_ = expression.args.get("with") 95 96 # CTEs break the mold of scope and introduce themselves to all in the context. 97 if with_: 98 ctes = ctes.copy() 99 for cte in with_.expressions: 100 step = Step.from_expression(cte.this, ctes) 101 step.name = cte.alias 102 ctes[step.name] = step # type: ignore 103 104 from_ = expression.args.get("from") 105 106 if isinstance(expression, exp.Select) and from_: 107 step = Scan.from_expression(from_.this, ctes) 108 elif isinstance(expression, exp.Union): 109 step = SetOperation.from_expression(expression, ctes) 110 else: 111 step = Scan() 112 113 joins = expression.args.get("joins") 114 115 if joins: 116 join = Join.from_joins(joins, ctes) 117 join.name = step.name 118 join.add_dependency(step) 119 step = join 120 121 projections = [] # final selects in this chain of steps representing a select 122 operands = {} # intermediate computations of agg funcs eg x + 1 in SUM(x + 1) 123 aggregations = [] 124 sequence = itertools.count() 125 126 def extract_agg_operands(expression): 127 for agg in expression.find_all(exp.AggFunc): 128 for operand in agg.unnest_operands(): 129 if isinstance(operand, exp.Column): 130 continue 131 if operand not in operands: 132 operands[operand] = f"_a_{next(sequence)}" 133 operand.replace(exp.column(operands[operand], quoted=True)) 134 135 for e in expression.expressions: 136 if e.find(exp.AggFunc): 137 projections.append(exp.column(e.alias_or_name, step.name, quoted=True)) 138 aggregations.append(e) 139 extract_agg_operands(e) 140 else: 141 projections.append(e) 142 143 where = expression.args.get("where") 144 145 if where: 146 step.condition = where.this 147 148 group = expression.args.get("group") 149 150 if group or aggregations: 151 aggregate = Aggregate() 152 aggregate.source = step.name 153 aggregate.name = step.name 154 155 having = expression.args.get("having") 156 157 if having: 158 extract_agg_operands(having) 159 aggregate.condition = having.this 160 161 aggregate.operands = tuple( 162 alias(operand, alias_) for operand, alias_ in operands.items() 163 ) 164 aggregate.aggregations = aggregations 165 # give aggregates names and replace projections with references to them 166 aggregate.group = { 167 f"_g{i}": e for i, e in enumerate(group.expressions if group else []) 168 } 169 for projection in projections: 170 for i, e in aggregate.group.items(): 171 for child, *_ in projection.walk(): 172 if child == e: 173 child.replace(exp.column(i, step.name)) 174 aggregate.add_dependency(step) 175 step = aggregate 176 177 order = expression.args.get("order") 178 179 if order: 180 sort = Sort() 181 sort.name = step.name 182 sort.key = order.expressions 183 sort.add_dependency(step) 184 step = sort 185 186 step.projections = projections 187 188 if isinstance(expression, exp.Select) and expression.args.get("distinct"): 189 distinct = Aggregate() 190 distinct.source = step.name 191 distinct.name = step.name 192 distinct.group = { 193 e.alias_or_name: exp.column(col=e.alias_or_name, table=step.name) 194 for e in projections or expression.expressions 195 } 196 distinct.add_dependency(step) 197 step = distinct 198 199 limit = expression.args.get("limit") 200 201 if limit: 202 step.limit = int(limit.text("expression")) 203 204 return step 205 206 def __init__(self) -> None: 207 self.name: t.Optional[str] = None 208 self.dependencies: t.Set[Step] = set() 209 self.dependents: t.Set[Step] = set() 210 self.projections: t.Sequence[exp.Expression] = [] 211 self.limit: float = math.inf 212 self.condition: t.Optional[exp.Expression] = None 213 214 def add_dependency(self, dependency: Step) -> None: 215 self.dependencies.add(dependency) 216 dependency.dependents.add(self) 217 218 def __repr__(self) -> str: 219 return self.to_s() 220 221 def to_s(self, level: int = 0) -> str: 222 indent = " " * level 223 nested = f"{indent} " 224 225 context = self._to_s(f"{nested} ") 226 227 if context: 228 context = [f"{nested}Context:"] + context 229 230 lines = [ 231 f"{indent}- {self.id}", 232 *context, 233 f"{nested}Projections:", 234 ] 235 236 for expression in self.projections: 237 lines.append(f"{nested} - {expression.sql()}") 238 239 if self.condition: 240 lines.append(f"{nested}Condition: {self.condition.sql()}") 241 242 if self.limit is not math.inf: 243 lines.append(f"{nested}Limit: {self.limit}") 244 245 if self.dependencies: 246 lines.append(f"{nested}Dependencies:") 247 for dependency in self.dependencies: 248 lines.append(" " + dependency.to_s(level + 1)) 249 250 return "\n".join(lines) 251 252 @property 253 def type_name(self) -> str: 254 return self.__class__.__name__ 255 256 @property 257 def id(self) -> str: 258 name = self.name 259 name = f" {name}" if name else "" 260 return f"{self.type_name}:{name} ({id(self)})" 261 262 def _to_s(self, _indent: str) -> t.List[str]: 263 return [] 264 265 266class Scan(Step): 267 @classmethod 268 def from_expression( 269 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 270 ) -> Step: 271 table = expression 272 alias_ = expression.alias_or_name 273 274 if isinstance(expression, exp.Subquery): 275 table = expression.this 276 step = Step.from_expression(table, ctes) 277 step.name = alias_ 278 return step 279 280 step = Scan() 281 step.name = alias_ 282 step.source = expression 283 if ctes and table.name in ctes: 284 step.add_dependency(ctes[table.name]) 285 286 return step 287 288 def __init__(self) -> None: 289 super().__init__() 290 self.source: t.Optional[exp.Expression] = None 291 292 def _to_s(self, indent: str) -> t.List[str]: 293 return [f"{indent}Source: {self.source.sql() if self.source else '-static-'}"] # type: ignore 294 295 296class Join(Step): 297 @classmethod 298 def from_joins( 299 cls, joins: t.Iterable[exp.Join], ctes: t.Optional[t.Dict[str, Step]] = None 300 ) -> Step: 301 step = Join() 302 303 for join in joins: 304 source_key, join_key, condition = join_condition(join) 305 step.joins[join.this.alias_or_name] = { 306 "side": join.side, # type: ignore 307 "join_key": join_key, 308 "source_key": source_key, 309 "condition": condition, 310 } 311 312 step.add_dependency(Scan.from_expression(join.this, ctes)) 313 314 return step 315 316 def __init__(self) -> None: 317 super().__init__() 318 self.joins: t.Dict[str, t.Dict[str, t.List[str] | exp.Expression]] = {} 319 320 def _to_s(self, indent: str) -> t.List[str]: 321 lines = [] 322 for name, join in self.joins.items(): 323 lines.append(f"{indent}{name}: {join['side']}") 324 if join.get("condition"): 325 lines.append(f"{indent}On: {join['condition'].sql()}") # type: ignore 326 return lines 327 328 329class Aggregate(Step): 330 def __init__(self) -> None: 331 super().__init__() 332 self.aggregations: t.List[exp.Expression] = [] 333 self.operands: t.Tuple[exp.Expression, ...] = () 334 self.group: t.Dict[str, exp.Expression] = {} 335 self.source: t.Optional[str] = None 336 337 def _to_s(self, indent: str) -> t.List[str]: 338 lines = [f"{indent}Aggregations:"] 339 340 for expression in self.aggregations: 341 lines.append(f"{indent} - {expression.sql()}") 342 343 if self.group: 344 lines.append(f"{indent}Group:") 345 for expression in self.group.values(): 346 lines.append(f"{indent} - {expression.sql()}") 347 if self.condition: 348 lines.append(f"{indent}Having:") 349 lines.append(f"{indent} - {self.condition.sql()}") 350 if self.operands: 351 lines.append(f"{indent}Operands:") 352 for expression in self.operands: 353 lines.append(f"{indent} - {expression.sql()}") 354 355 return lines 356 357 358class Sort(Step): 359 def __init__(self) -> None: 360 super().__init__() 361 self.key = None 362 363 def _to_s(self, indent: str) -> t.List[str]: 364 lines = [f"{indent}Key:"] 365 366 for expression in self.key: # type: ignore 367 lines.append(f"{indent} - {expression.sql()}") 368 369 return lines 370 371 372class SetOperation(Step): 373 def __init__( 374 self, 375 op: t.Type[exp.Expression], 376 left: str | None, 377 right: str | None, 378 distinct: bool = False, 379 ) -> None: 380 super().__init__() 381 self.op = op 382 self.left = left 383 self.right = right 384 self.distinct = distinct 385 386 @classmethod 387 def from_expression( 388 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 389 ) -> Step: 390 assert isinstance(expression, exp.Union) 391 left = Step.from_expression(expression.left, ctes) 392 right = Step.from_expression(expression.right, ctes) 393 step = cls( 394 op=expression.__class__, 395 left=left.name, 396 right=right.name, 397 distinct=bool(expression.args.get("distinct")), 398 ) 399 step.add_dependency(left) 400 step.add_dependency(right) 401 return step 402 403 def _to_s(self, indent: str) -> t.List[str]: 404 lines = [] 405 if self.distinct: 406 lines.append(f"{indent}Distinct: {self.distinct}") 407 return lines 408 409 @property 410 def type_name(self) -> str: 411 return self.op.__name__
12class Plan: 13 def __init__(self, expression: exp.Expression) -> None: 14 self.expression = expression.copy() 15 self.root = Step.from_expression(self.expression) 16 self._dag: t.Dict[Step, t.Set[Step]] = {} 17 18 @property 19 def dag(self) -> t.Dict[Step, t.Set[Step]]: 20 if not self._dag: 21 dag: t.Dict[Step, t.Set[Step]] = {} 22 nodes = {self.root} 23 24 while nodes: 25 node = nodes.pop() 26 dag[node] = set() 27 for dep in node.dependencies: 28 dag[node].add(dep) 29 nodes.add(dep) 30 self._dag = dag 31 32 return self._dag 33 34 @property 35 def leaves(self) -> t.Iterator[Step]: 36 return (node for node, deps in self.dag.items() if not deps) 37 38 def __repr__(self) -> str: 39 return f"Plan\n----\n{repr(self.root)}"
42class Step: 43 @classmethod 44 def from_expression( 45 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 46 ) -> Step: 47 """ 48 Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. 49 Note: the expression's tables and subqueries must be aliased for this method to work. For 50 example, given the following expression: 51 52 SELECT 53 x.a, 54 SUM(x.b) 55 FROM x AS x 56 JOIN y AS y 57 ON x.a = y.a 58 GROUP BY x.a 59 60 the following DAG is produced (the expression IDs might differ per execution): 61 62 - Aggregate: x (4347984624) 63 Context: 64 Aggregations: 65 - SUM(x.b) 66 Group: 67 - x.a 68 Projections: 69 - x.a 70 - "x"."" 71 Dependencies: 72 - Join: x (4347985296) 73 Context: 74 y: 75 On: x.a = y.a 76 Projections: 77 Dependencies: 78 - Scan: x (4347983136) 79 Context: 80 Source: x AS x 81 Projections: 82 - Scan: y (4343416624) 83 Context: 84 Source: y AS y 85 Projections: 86 87 Args: 88 expression: the expression to build the DAG from. 89 ctes: a dictionary that maps CTEs to their corresponding Step DAG by name. 90 91 Returns: 92 A Step DAG corresponding to `expression`. 93 """ 94 ctes = ctes or {} 95 with_ = expression.args.get("with") 96 97 # CTEs break the mold of scope and introduce themselves to all in the context. 98 if with_: 99 ctes = ctes.copy() 100 for cte in with_.expressions: 101 step = Step.from_expression(cte.this, ctes) 102 step.name = cte.alias 103 ctes[step.name] = step # type: ignore 104 105 from_ = expression.args.get("from") 106 107 if isinstance(expression, exp.Select) and from_: 108 step = Scan.from_expression(from_.this, ctes) 109 elif isinstance(expression, exp.Union): 110 step = SetOperation.from_expression(expression, ctes) 111 else: 112 step = Scan() 113 114 joins = expression.args.get("joins") 115 116 if joins: 117 join = Join.from_joins(joins, ctes) 118 join.name = step.name 119 join.add_dependency(step) 120 step = join 121 122 projections = [] # final selects in this chain of steps representing a select 123 operands = {} # intermediate computations of agg funcs eg x + 1 in SUM(x + 1) 124 aggregations = [] 125 sequence = itertools.count() 126 127 def extract_agg_operands(expression): 128 for agg in expression.find_all(exp.AggFunc): 129 for operand in agg.unnest_operands(): 130 if isinstance(operand, exp.Column): 131 continue 132 if operand not in operands: 133 operands[operand] = f"_a_{next(sequence)}" 134 operand.replace(exp.column(operands[operand], quoted=True)) 135 136 for e in expression.expressions: 137 if e.find(exp.AggFunc): 138 projections.append(exp.column(e.alias_or_name, step.name, quoted=True)) 139 aggregations.append(e) 140 extract_agg_operands(e) 141 else: 142 projections.append(e) 143 144 where = expression.args.get("where") 145 146 if where: 147 step.condition = where.this 148 149 group = expression.args.get("group") 150 151 if group or aggregations: 152 aggregate = Aggregate() 153 aggregate.source = step.name 154 aggregate.name = step.name 155 156 having = expression.args.get("having") 157 158 if having: 159 extract_agg_operands(having) 160 aggregate.condition = having.this 161 162 aggregate.operands = tuple( 163 alias(operand, alias_) for operand, alias_ in operands.items() 164 ) 165 aggregate.aggregations = aggregations 166 # give aggregates names and replace projections with references to them 167 aggregate.group = { 168 f"_g{i}": e for i, e in enumerate(group.expressions if group else []) 169 } 170 for projection in projections: 171 for i, e in aggregate.group.items(): 172 for child, *_ in projection.walk(): 173 if child == e: 174 child.replace(exp.column(i, step.name)) 175 aggregate.add_dependency(step) 176 step = aggregate 177 178 order = expression.args.get("order") 179 180 if order: 181 sort = Sort() 182 sort.name = step.name 183 sort.key = order.expressions 184 sort.add_dependency(step) 185 step = sort 186 187 step.projections = projections 188 189 if isinstance(expression, exp.Select) and expression.args.get("distinct"): 190 distinct = Aggregate() 191 distinct.source = step.name 192 distinct.name = step.name 193 distinct.group = { 194 e.alias_or_name: exp.column(col=e.alias_or_name, table=step.name) 195 for e in projections or expression.expressions 196 } 197 distinct.add_dependency(step) 198 step = distinct 199 200 limit = expression.args.get("limit") 201 202 if limit: 203 step.limit = int(limit.text("expression")) 204 205 return step 206 207 def __init__(self) -> None: 208 self.name: t.Optional[str] = None 209 self.dependencies: t.Set[Step] = set() 210 self.dependents: t.Set[Step] = set() 211 self.projections: t.Sequence[exp.Expression] = [] 212 self.limit: float = math.inf 213 self.condition: t.Optional[exp.Expression] = None 214 215 def add_dependency(self, dependency: Step) -> None: 216 self.dependencies.add(dependency) 217 dependency.dependents.add(self) 218 219 def __repr__(self) -> str: 220 return self.to_s() 221 222 def to_s(self, level: int = 0) -> str: 223 indent = " " * level 224 nested = f"{indent} " 225 226 context = self._to_s(f"{nested} ") 227 228 if context: 229 context = [f"{nested}Context:"] + context 230 231 lines = [ 232 f"{indent}- {self.id}", 233 *context, 234 f"{nested}Projections:", 235 ] 236 237 for expression in self.projections: 238 lines.append(f"{nested} - {expression.sql()}") 239 240 if self.condition: 241 lines.append(f"{nested}Condition: {self.condition.sql()}") 242 243 if self.limit is not math.inf: 244 lines.append(f"{nested}Limit: {self.limit}") 245 246 if self.dependencies: 247 lines.append(f"{nested}Dependencies:") 248 for dependency in self.dependencies: 249 lines.append(" " + dependency.to_s(level + 1)) 250 251 return "\n".join(lines) 252 253 @property 254 def type_name(self) -> str: 255 return self.__class__.__name__ 256 257 @property 258 def id(self) -> str: 259 name = self.name 260 name = f" {name}" if name else "" 261 return f"{self.type_name}:{name} ({id(self)})" 262 263 def _to_s(self, _indent: str) -> t.List[str]: 264 return []
43 @classmethod 44 def from_expression( 45 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 46 ) -> Step: 47 """ 48 Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. 49 Note: the expression's tables and subqueries must be aliased for this method to work. For 50 example, given the following expression: 51 52 SELECT 53 x.a, 54 SUM(x.b) 55 FROM x AS x 56 JOIN y AS y 57 ON x.a = y.a 58 GROUP BY x.a 59 60 the following DAG is produced (the expression IDs might differ per execution): 61 62 - Aggregate: x (4347984624) 63 Context: 64 Aggregations: 65 - SUM(x.b) 66 Group: 67 - x.a 68 Projections: 69 - x.a 70 - "x"."" 71 Dependencies: 72 - Join: x (4347985296) 73 Context: 74 y: 75 On: x.a = y.a 76 Projections: 77 Dependencies: 78 - Scan: x (4347983136) 79 Context: 80 Source: x AS x 81 Projections: 82 - Scan: y (4343416624) 83 Context: 84 Source: y AS y 85 Projections: 86 87 Args: 88 expression: the expression to build the DAG from. 89 ctes: a dictionary that maps CTEs to their corresponding Step DAG by name. 90 91 Returns: 92 A Step DAG corresponding to `expression`. 93 """ 94 ctes = ctes or {} 95 with_ = expression.args.get("with") 96 97 # CTEs break the mold of scope and introduce themselves to all in the context. 98 if with_: 99 ctes = ctes.copy() 100 for cte in with_.expressions: 101 step = Step.from_expression(cte.this, ctes) 102 step.name = cte.alias 103 ctes[step.name] = step # type: ignore 104 105 from_ = expression.args.get("from") 106 107 if isinstance(expression, exp.Select) and from_: 108 step = Scan.from_expression(from_.this, ctes) 109 elif isinstance(expression, exp.Union): 110 step = SetOperation.from_expression(expression, ctes) 111 else: 112 step = Scan() 113 114 joins = expression.args.get("joins") 115 116 if joins: 117 join = Join.from_joins(joins, ctes) 118 join.name = step.name 119 join.add_dependency(step) 120 step = join 121 122 projections = [] # final selects in this chain of steps representing a select 123 operands = {} # intermediate computations of agg funcs eg x + 1 in SUM(x + 1) 124 aggregations = [] 125 sequence = itertools.count() 126 127 def extract_agg_operands(expression): 128 for agg in expression.find_all(exp.AggFunc): 129 for operand in agg.unnest_operands(): 130 if isinstance(operand, exp.Column): 131 continue 132 if operand not in operands: 133 operands[operand] = f"_a_{next(sequence)}" 134 operand.replace(exp.column(operands[operand], quoted=True)) 135 136 for e in expression.expressions: 137 if e.find(exp.AggFunc): 138 projections.append(exp.column(e.alias_or_name, step.name, quoted=True)) 139 aggregations.append(e) 140 extract_agg_operands(e) 141 else: 142 projections.append(e) 143 144 where = expression.args.get("where") 145 146 if where: 147 step.condition = where.this 148 149 group = expression.args.get("group") 150 151 if group or aggregations: 152 aggregate = Aggregate() 153 aggregate.source = step.name 154 aggregate.name = step.name 155 156 having = expression.args.get("having") 157 158 if having: 159 extract_agg_operands(having) 160 aggregate.condition = having.this 161 162 aggregate.operands = tuple( 163 alias(operand, alias_) for operand, alias_ in operands.items() 164 ) 165 aggregate.aggregations = aggregations 166 # give aggregates names and replace projections with references to them 167 aggregate.group = { 168 f"_g{i}": e for i, e in enumerate(group.expressions if group else []) 169 } 170 for projection in projections: 171 for i, e in aggregate.group.items(): 172 for child, *_ in projection.walk(): 173 if child == e: 174 child.replace(exp.column(i, step.name)) 175 aggregate.add_dependency(step) 176 step = aggregate 177 178 order = expression.args.get("order") 179 180 if order: 181 sort = Sort() 182 sort.name = step.name 183 sort.key = order.expressions 184 sort.add_dependency(step) 185 step = sort 186 187 step.projections = projections 188 189 if isinstance(expression, exp.Select) and expression.args.get("distinct"): 190 distinct = Aggregate() 191 distinct.source = step.name 192 distinct.name = step.name 193 distinct.group = { 194 e.alias_or_name: exp.column(col=e.alias_or_name, table=step.name) 195 for e in projections or expression.expressions 196 } 197 distinct.add_dependency(step) 198 step = distinct 199 200 limit = expression.args.get("limit") 201 202 if limit: 203 step.limit = int(limit.text("expression")) 204 205 return step
Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. Note: the expression's tables and subqueries must be aliased for this method to work. For example, given the following expression:
SELECT x.a, SUM(x.b) FROM x AS x JOIN y AS y ON x.a = y.a GROUP BY x.a
the following DAG is produced (the expression IDs might differ per execution):
- Aggregate: x (4347984624)
Context:
Aggregations:
- SUM(x.b)
Group:
- x.a
Projections:
- x.a
- "x".""
Dependencies:
- Join: x (4347985296) Context: y: On: x.a = y.a Projections: Dependencies:
- Scan: x (4347983136) Context: Source: x AS x Projections:
- Scan: y (4343416624) Context: Source: y AS y Projections:
Arguments:
- expression: the expression to build the DAG from.
- ctes: a dictionary that maps CTEs to their corresponding Step DAG by name.
Returns:
A Step DAG corresponding to
expression
.
222 def to_s(self, level: int = 0) -> str: 223 indent = " " * level 224 nested = f"{indent} " 225 226 context = self._to_s(f"{nested} ") 227 228 if context: 229 context = [f"{nested}Context:"] + context 230 231 lines = [ 232 f"{indent}- {self.id}", 233 *context, 234 f"{nested}Projections:", 235 ] 236 237 for expression in self.projections: 238 lines.append(f"{nested} - {expression.sql()}") 239 240 if self.condition: 241 lines.append(f"{nested}Condition: {self.condition.sql()}") 242 243 if self.limit is not math.inf: 244 lines.append(f"{nested}Limit: {self.limit}") 245 246 if self.dependencies: 247 lines.append(f"{nested}Dependencies:") 248 for dependency in self.dependencies: 249 lines.append(" " + dependency.to_s(level + 1)) 250 251 return "\n".join(lines)
267class Scan(Step): 268 @classmethod 269 def from_expression( 270 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 271 ) -> Step: 272 table = expression 273 alias_ = expression.alias_or_name 274 275 if isinstance(expression, exp.Subquery): 276 table = expression.this 277 step = Step.from_expression(table, ctes) 278 step.name = alias_ 279 return step 280 281 step = Scan() 282 step.name = alias_ 283 step.source = expression 284 if ctes and table.name in ctes: 285 step.add_dependency(ctes[table.name]) 286 287 return step 288 289 def __init__(self) -> None: 290 super().__init__() 291 self.source: t.Optional[exp.Expression] = None 292 293 def _to_s(self, indent: str) -> t.List[str]: 294 return [f"{indent}Source: {self.source.sql() if self.source else '-static-'}"] # type: ignore
268 @classmethod 269 def from_expression( 270 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 271 ) -> Step: 272 table = expression 273 alias_ = expression.alias_or_name 274 275 if isinstance(expression, exp.Subquery): 276 table = expression.this 277 step = Step.from_expression(table, ctes) 278 step.name = alias_ 279 return step 280 281 step = Scan() 282 step.name = alias_ 283 step.source = expression 284 if ctes and table.name in ctes: 285 step.add_dependency(ctes[table.name]) 286 287 return step
Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. Note: the expression's tables and subqueries must be aliased for this method to work. For example, given the following expression:
SELECT x.a, SUM(x.b) FROM x AS x JOIN y AS y ON x.a = y.a GROUP BY x.a
the following DAG is produced (the expression IDs might differ per execution):
- Aggregate: x (4347984624)
Context:
Aggregations:
- SUM(x.b)
Group:
- x.a
Projections:
- x.a
- "x".""
Dependencies:
- Join: x (4347985296) Context: y: On: x.a = y.a Projections: Dependencies:
- Scan: x (4347983136) Context: Source: x AS x Projections:
- Scan: y (4343416624) Context: Source: y AS y Projections:
Arguments:
- expression: the expression to build the DAG from.
- ctes: a dictionary that maps CTEs to their corresponding Step DAG by name.
Returns:
A Step DAG corresponding to
expression
.
Inherited Members
297class Join(Step): 298 @classmethod 299 def from_joins( 300 cls, joins: t.Iterable[exp.Join], ctes: t.Optional[t.Dict[str, Step]] = None 301 ) -> Step: 302 step = Join() 303 304 for join in joins: 305 source_key, join_key, condition = join_condition(join) 306 step.joins[join.this.alias_or_name] = { 307 "side": join.side, # type: ignore 308 "join_key": join_key, 309 "source_key": source_key, 310 "condition": condition, 311 } 312 313 step.add_dependency(Scan.from_expression(join.this, ctes)) 314 315 return step 316 317 def __init__(self) -> None: 318 super().__init__() 319 self.joins: t.Dict[str, t.Dict[str, t.List[str] | exp.Expression]] = {} 320 321 def _to_s(self, indent: str) -> t.List[str]: 322 lines = [] 323 for name, join in self.joins.items(): 324 lines.append(f"{indent}{name}: {join['side']}") 325 if join.get("condition"): 326 lines.append(f"{indent}On: {join['condition'].sql()}") # type: ignore 327 return lines
298 @classmethod 299 def from_joins( 300 cls, joins: t.Iterable[exp.Join], ctes: t.Optional[t.Dict[str, Step]] = None 301 ) -> Step: 302 step = Join() 303 304 for join in joins: 305 source_key, join_key, condition = join_condition(join) 306 step.joins[join.this.alias_or_name] = { 307 "side": join.side, # type: ignore 308 "join_key": join_key, 309 "source_key": source_key, 310 "condition": condition, 311 } 312 313 step.add_dependency(Scan.from_expression(join.this, ctes)) 314 315 return step
Inherited Members
330class Aggregate(Step): 331 def __init__(self) -> None: 332 super().__init__() 333 self.aggregations: t.List[exp.Expression] = [] 334 self.operands: t.Tuple[exp.Expression, ...] = () 335 self.group: t.Dict[str, exp.Expression] = {} 336 self.source: t.Optional[str] = None 337 338 def _to_s(self, indent: str) -> t.List[str]: 339 lines = [f"{indent}Aggregations:"] 340 341 for expression in self.aggregations: 342 lines.append(f"{indent} - {expression.sql()}") 343 344 if self.group: 345 lines.append(f"{indent}Group:") 346 for expression in self.group.values(): 347 lines.append(f"{indent} - {expression.sql()}") 348 if self.condition: 349 lines.append(f"{indent}Having:") 350 lines.append(f"{indent} - {self.condition.sql()}") 351 if self.operands: 352 lines.append(f"{indent}Operands:") 353 for expression in self.operands: 354 lines.append(f"{indent} - {expression.sql()}") 355 356 return lines
Inherited Members
359class Sort(Step): 360 def __init__(self) -> None: 361 super().__init__() 362 self.key = None 363 364 def _to_s(self, indent: str) -> t.List[str]: 365 lines = [f"{indent}Key:"] 366 367 for expression in self.key: # type: ignore 368 lines.append(f"{indent} - {expression.sql()}") 369 370 return lines
Inherited Members
373class SetOperation(Step): 374 def __init__( 375 self, 376 op: t.Type[exp.Expression], 377 left: str | None, 378 right: str | None, 379 distinct: bool = False, 380 ) -> None: 381 super().__init__() 382 self.op = op 383 self.left = left 384 self.right = right 385 self.distinct = distinct 386 387 @classmethod 388 def from_expression( 389 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 390 ) -> Step: 391 assert isinstance(expression, exp.Union) 392 left = Step.from_expression(expression.left, ctes) 393 right = Step.from_expression(expression.right, ctes) 394 step = cls( 395 op=expression.__class__, 396 left=left.name, 397 right=right.name, 398 distinct=bool(expression.args.get("distinct")), 399 ) 400 step.add_dependency(left) 401 step.add_dependency(right) 402 return step 403 404 def _to_s(self, indent: str) -> t.List[str]: 405 lines = [] 406 if self.distinct: 407 lines.append(f"{indent}Distinct: {self.distinct}") 408 return lines 409 410 @property 411 def type_name(self) -> str: 412 return self.op.__name__
387 @classmethod 388 def from_expression( 389 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 390 ) -> Step: 391 assert isinstance(expression, exp.Union) 392 left = Step.from_expression(expression.left, ctes) 393 right = Step.from_expression(expression.right, ctes) 394 step = cls( 395 op=expression.__class__, 396 left=left.name, 397 right=right.name, 398 distinct=bool(expression.args.get("distinct")), 399 ) 400 step.add_dependency(left) 401 step.add_dependency(right) 402 return step
Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. Note: the expression's tables and subqueries must be aliased for this method to work. For example, given the following expression:
SELECT x.a, SUM(x.b) FROM x AS x JOIN y AS y ON x.a = y.a GROUP BY x.a
the following DAG is produced (the expression IDs might differ per execution):
- Aggregate: x (4347984624)
Context:
Aggregations:
- SUM(x.b)
Group:
- x.a
Projections:
- x.a
- "x".""
Dependencies:
- Join: x (4347985296) Context: y: On: x.a = y.a Projections: Dependencies:
- Scan: x (4347983136) Context: Source: x AS x Projections:
- Scan: y (4343416624) Context: Source: y AS y Projections:
Arguments:
- expression: the expression to build the DAG from.
- ctes: a dictionary that maps CTEs to their corresponding Step DAG by name.
Returns:
A Step DAG corresponding to
expression
.