sqlglot.planner
1from __future__ import annotations 2 3import itertools 4import math 5import typing as t 6 7from sqlglot import alias, exp 8from sqlglot.errors import UnsupportedError 9from sqlglot.optimizer.eliminate_joins import join_condition 10 11 12class Plan: 13 def __init__(self, expression: exp.Expression) -> None: 14 self.expression = expression.copy() 15 self.root = Step.from_expression(self.expression) 16 self._dag: t.Dict[Step, t.Set[Step]] = {} 17 18 @property 19 def dag(self) -> t.Dict[Step, t.Set[Step]]: 20 if not self._dag: 21 dag: t.Dict[Step, t.Set[Step]] = {} 22 nodes = {self.root} 23 24 while nodes: 25 node = nodes.pop() 26 dag[node] = set() 27 for dep in node.dependencies: 28 dag[node].add(dep) 29 nodes.add(dep) 30 self._dag = dag 31 32 return self._dag 33 34 @property 35 def leaves(self) -> t.Iterator[Step]: 36 return (node for node, deps in self.dag.items() if not deps) 37 38 def __repr__(self) -> str: 39 return f"Plan\n----\n{repr(self.root)}" 40 41 42class Step: 43 @classmethod 44 def from_expression( 45 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 46 ) -> Step: 47 """ 48 Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. 49 Note: the expression's tables and subqueries must be aliased for this method to work. For 50 example, given the following expression: 51 52 SELECT 53 x.a, 54 SUM(x.b) 55 FROM x AS x 56 JOIN y AS y 57 ON x.a = y.a 58 GROUP BY x.a 59 60 the following DAG is produced (the expression IDs might differ per execution): 61 62 - Aggregate: x (4347984624) 63 Context: 64 Aggregations: 65 - SUM(x.b) 66 Group: 67 - x.a 68 Projections: 69 - x.a 70 - "x"."" 71 Dependencies: 72 - Join: x (4347985296) 73 Context: 74 y: 75 On: x.a = y.a 76 Projections: 77 Dependencies: 78 - Scan: x (4347983136) 79 Context: 80 Source: x AS x 81 Projections: 82 - Scan: y (4343416624) 83 Context: 84 Source: y AS y 85 Projections: 86 87 Args: 88 expression: the expression to build the DAG from. 89 ctes: a dictionary that maps CTEs to their corresponding Step DAG by name. 90 91 Returns: 92 A Step DAG corresponding to `expression`. 93 """ 94 ctes = ctes or {} 95 with_ = expression.args.get("with") 96 97 # CTEs break the mold of scope and introduce themselves to all in the context. 98 if with_: 99 ctes = ctes.copy() 100 for cte in with_.expressions: 101 step = Step.from_expression(cte.this, ctes) 102 step.name = cte.alias 103 ctes[step.name] = step # type: ignore 104 105 from_ = expression.args.get("from") 106 107 if isinstance(expression, exp.Select) and from_: 108 from_ = from_.expressions 109 if len(from_) > 1: 110 raise UnsupportedError( 111 "Multi-from statements are unsupported. Run it through the optimizer" 112 ) 113 114 step = Scan.from_expression(from_[0], ctes) 115 elif isinstance(expression, exp.Union): 116 step = SetOperation.from_expression(expression, ctes) 117 else: 118 step = Scan() 119 120 joins = expression.args.get("joins") 121 122 if joins: 123 join = Join.from_joins(joins, ctes) 124 join.name = step.name 125 join.add_dependency(step) 126 step = join 127 128 projections = [] # final selects in this chain of steps representing a select 129 operands = {} # intermediate computations of agg funcs eg x + 1 in SUM(x + 1) 130 aggregations = [] 131 sequence = itertools.count() 132 133 def extract_agg_operands(expression): 134 for agg in expression.find_all(exp.AggFunc): 135 for operand in agg.unnest_operands(): 136 if isinstance(operand, exp.Column): 137 continue 138 if operand not in operands: 139 operands[operand] = f"_a_{next(sequence)}" 140 operand.replace(exp.column(operands[operand], quoted=True)) 141 142 for e in expression.expressions: 143 if e.find(exp.AggFunc): 144 projections.append(exp.column(e.alias_or_name, step.name, quoted=True)) 145 aggregations.append(e) 146 extract_agg_operands(e) 147 else: 148 projections.append(e) 149 150 where = expression.args.get("where") 151 152 if where: 153 step.condition = where.this 154 155 group = expression.args.get("group") 156 157 if group or aggregations: 158 aggregate = Aggregate() 159 aggregate.source = step.name 160 aggregate.name = step.name 161 162 having = expression.args.get("having") 163 164 if having: 165 extract_agg_operands(having) 166 aggregate.condition = having.this 167 168 aggregate.operands = tuple( 169 alias(operand, alias_) for operand, alias_ in operands.items() 170 ) 171 aggregate.aggregations = aggregations 172 # give aggregates names and replace projections with references to them 173 aggregate.group = { 174 f"_g{i}": e for i, e in enumerate(group.expressions if group else []) 175 } 176 for projection in projections: 177 for i, e in aggregate.group.items(): 178 for child, _, _ in projection.walk(): 179 if child == e: 180 child.replace(exp.column(i, step.name)) 181 aggregate.add_dependency(step) 182 step = aggregate 183 184 order = expression.args.get("order") 185 186 if order: 187 sort = Sort() 188 sort.name = step.name 189 sort.key = order.expressions 190 sort.add_dependency(step) 191 step = sort 192 193 step.projections = projections 194 195 if isinstance(expression, exp.Select) and expression.args.get("distinct"): 196 distinct = Aggregate() 197 distinct.source = step.name 198 distinct.name = step.name 199 distinct.group = { 200 e.alias_or_name: exp.column(col=e.alias_or_name, table=step.name) 201 for e in projections or expression.expressions 202 } 203 distinct.add_dependency(step) 204 step = distinct 205 206 limit = expression.args.get("limit") 207 208 if limit: 209 step.limit = int(limit.text("expression")) 210 211 return step 212 213 def __init__(self) -> None: 214 self.name: t.Optional[str] = None 215 self.dependencies: t.Set[Step] = set() 216 self.dependents: t.Set[Step] = set() 217 self.projections: t.Sequence[exp.Expression] = [] 218 self.limit: float = math.inf 219 self.condition: t.Optional[exp.Expression] = None 220 221 def add_dependency(self, dependency: Step) -> None: 222 self.dependencies.add(dependency) 223 dependency.dependents.add(self) 224 225 def __repr__(self) -> str: 226 return self.to_s() 227 228 def to_s(self, level: int = 0) -> str: 229 indent = " " * level 230 nested = f"{indent} " 231 232 context = self._to_s(f"{nested} ") 233 234 if context: 235 context = [f"{nested}Context:"] + context 236 237 lines = [ 238 f"{indent}- {self.id}", 239 *context, 240 f"{nested}Projections:", 241 ] 242 243 for expression in self.projections: 244 lines.append(f"{nested} - {expression.sql()}") 245 246 if self.condition: 247 lines.append(f"{nested}Condition: {self.condition.sql()}") 248 249 if self.limit is not math.inf: 250 lines.append(f"{nested}Limit: {self.limit}") 251 252 if self.dependencies: 253 lines.append(f"{nested}Dependencies:") 254 for dependency in self.dependencies: 255 lines.append(" " + dependency.to_s(level + 1)) 256 257 return "\n".join(lines) 258 259 @property 260 def type_name(self) -> str: 261 return self.__class__.__name__ 262 263 @property 264 def id(self) -> str: 265 name = self.name 266 name = f" {name}" if name else "" 267 return f"{self.type_name}:{name} ({id(self)})" 268 269 def _to_s(self, _indent: str) -> t.List[str]: 270 return [] 271 272 273class Scan(Step): 274 @classmethod 275 def from_expression( 276 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 277 ) -> Step: 278 table = expression 279 alias_ = expression.alias_or_name 280 281 if isinstance(expression, exp.Subquery): 282 table = expression.this 283 step = Step.from_expression(table, ctes) 284 step.name = alias_ 285 return step 286 287 step = Scan() 288 step.name = alias_ 289 step.source = expression 290 if ctes and table.name in ctes: 291 step.add_dependency(ctes[table.name]) 292 293 return step 294 295 def __init__(self) -> None: 296 super().__init__() 297 self.source: t.Optional[exp.Expression] = None 298 299 def _to_s(self, indent: str) -> t.List[str]: 300 return [f"{indent}Source: {self.source.sql() if self.source else '-static-'}"] # type: ignore 301 302 303class Join(Step): 304 @classmethod 305 def from_joins( 306 cls, joins: t.Iterable[exp.Join], ctes: t.Optional[t.Dict[str, Step]] = None 307 ) -> Step: 308 step = Join() 309 310 for join in joins: 311 source_key, join_key, condition = join_condition(join) 312 step.joins[join.this.alias_or_name] = { 313 "side": join.side, 314 "join_key": join_key, 315 "source_key": source_key, 316 "condition": condition, 317 } 318 319 step.add_dependency(Scan.from_expression(join.this, ctes)) 320 321 return step 322 323 def __init__(self) -> None: 324 super().__init__() 325 self.joins: t.Dict[str, t.Dict[str, t.List[str] | exp.Expression]] = {} 326 327 def _to_s(self, indent: str) -> t.List[str]: 328 lines = [] 329 for name, join in self.joins.items(): 330 lines.append(f"{indent}{name}: {join['side']}") 331 if join.get("condition"): 332 lines.append(f"{indent}On: {join['condition'].sql()}") # type: ignore 333 return lines 334 335 336class Aggregate(Step): 337 def __init__(self) -> None: 338 super().__init__() 339 self.aggregations: t.List[exp.Expression] = [] 340 self.operands: t.Tuple[exp.Expression, ...] = () 341 self.group: t.Dict[str, exp.Expression] = {} 342 self.source: t.Optional[str] = None 343 344 def _to_s(self, indent: str) -> t.List[str]: 345 lines = [f"{indent}Aggregations:"] 346 347 for expression in self.aggregations: 348 lines.append(f"{indent} - {expression.sql()}") 349 350 if self.group: 351 lines.append(f"{indent}Group:") 352 for expression in self.group.values(): 353 lines.append(f"{indent} - {expression.sql()}") 354 if self.condition: 355 lines.append(f"{indent}Having:") 356 lines.append(f"{indent} - {self.condition.sql()}") 357 if self.operands: 358 lines.append(f"{indent}Operands:") 359 for expression in self.operands: 360 lines.append(f"{indent} - {expression.sql()}") 361 362 return lines 363 364 365class Sort(Step): 366 def __init__(self) -> None: 367 super().__init__() 368 self.key = None 369 370 def _to_s(self, indent: str) -> t.List[str]: 371 lines = [f"{indent}Key:"] 372 373 for expression in self.key: # type: ignore 374 lines.append(f"{indent} - {expression.sql()}") 375 376 return lines 377 378 379class SetOperation(Step): 380 def __init__( 381 self, 382 op: t.Type[exp.Expression], 383 left: str | None, 384 right: str | None, 385 distinct: bool = False, 386 ) -> None: 387 super().__init__() 388 self.op = op 389 self.left = left 390 self.right = right 391 self.distinct = distinct 392 393 @classmethod 394 def from_expression( 395 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 396 ) -> Step: 397 assert isinstance(expression, exp.Union) 398 left = Step.from_expression(expression.left, ctes) 399 right = Step.from_expression(expression.right, ctes) 400 step = cls( 401 op=expression.__class__, 402 left=left.name, 403 right=right.name, 404 distinct=bool(expression.args.get("distinct")), 405 ) 406 step.add_dependency(left) 407 step.add_dependency(right) 408 return step 409 410 def _to_s(self, indent: str) -> t.List[str]: 411 lines = [] 412 if self.distinct: 413 lines.append(f"{indent}Distinct: {self.distinct}") 414 return lines 415 416 @property 417 def type_name(self) -> str: 418 return self.op.__name__
13class Plan: 14 def __init__(self, expression: exp.Expression) -> None: 15 self.expression = expression.copy() 16 self.root = Step.from_expression(self.expression) 17 self._dag: t.Dict[Step, t.Set[Step]] = {} 18 19 @property 20 def dag(self) -> t.Dict[Step, t.Set[Step]]: 21 if not self._dag: 22 dag: t.Dict[Step, t.Set[Step]] = {} 23 nodes = {self.root} 24 25 while nodes: 26 node = nodes.pop() 27 dag[node] = set() 28 for dep in node.dependencies: 29 dag[node].add(dep) 30 nodes.add(dep) 31 self._dag = dag 32 33 return self._dag 34 35 @property 36 def leaves(self) -> t.Iterator[Step]: 37 return (node for node, deps in self.dag.items() if not deps) 38 39 def __repr__(self) -> str: 40 return f"Plan\n----\n{repr(self.root)}"
43class Step: 44 @classmethod 45 def from_expression( 46 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 47 ) -> Step: 48 """ 49 Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. 50 Note: the expression's tables and subqueries must be aliased for this method to work. For 51 example, given the following expression: 52 53 SELECT 54 x.a, 55 SUM(x.b) 56 FROM x AS x 57 JOIN y AS y 58 ON x.a = y.a 59 GROUP BY x.a 60 61 the following DAG is produced (the expression IDs might differ per execution): 62 63 - Aggregate: x (4347984624) 64 Context: 65 Aggregations: 66 - SUM(x.b) 67 Group: 68 - x.a 69 Projections: 70 - x.a 71 - "x"."" 72 Dependencies: 73 - Join: x (4347985296) 74 Context: 75 y: 76 On: x.a = y.a 77 Projections: 78 Dependencies: 79 - Scan: x (4347983136) 80 Context: 81 Source: x AS x 82 Projections: 83 - Scan: y (4343416624) 84 Context: 85 Source: y AS y 86 Projections: 87 88 Args: 89 expression: the expression to build the DAG from. 90 ctes: a dictionary that maps CTEs to their corresponding Step DAG by name. 91 92 Returns: 93 A Step DAG corresponding to `expression`. 94 """ 95 ctes = ctes or {} 96 with_ = expression.args.get("with") 97 98 # CTEs break the mold of scope and introduce themselves to all in the context. 99 if with_: 100 ctes = ctes.copy() 101 for cte in with_.expressions: 102 step = Step.from_expression(cte.this, ctes) 103 step.name = cte.alias 104 ctes[step.name] = step # type: ignore 105 106 from_ = expression.args.get("from") 107 108 if isinstance(expression, exp.Select) and from_: 109 from_ = from_.expressions 110 if len(from_) > 1: 111 raise UnsupportedError( 112 "Multi-from statements are unsupported. Run it through the optimizer" 113 ) 114 115 step = Scan.from_expression(from_[0], ctes) 116 elif isinstance(expression, exp.Union): 117 step = SetOperation.from_expression(expression, ctes) 118 else: 119 step = Scan() 120 121 joins = expression.args.get("joins") 122 123 if joins: 124 join = Join.from_joins(joins, ctes) 125 join.name = step.name 126 join.add_dependency(step) 127 step = join 128 129 projections = [] # final selects in this chain of steps representing a select 130 operands = {} # intermediate computations of agg funcs eg x + 1 in SUM(x + 1) 131 aggregations = [] 132 sequence = itertools.count() 133 134 def extract_agg_operands(expression): 135 for agg in expression.find_all(exp.AggFunc): 136 for operand in agg.unnest_operands(): 137 if isinstance(operand, exp.Column): 138 continue 139 if operand not in operands: 140 operands[operand] = f"_a_{next(sequence)}" 141 operand.replace(exp.column(operands[operand], quoted=True)) 142 143 for e in expression.expressions: 144 if e.find(exp.AggFunc): 145 projections.append(exp.column(e.alias_or_name, step.name, quoted=True)) 146 aggregations.append(e) 147 extract_agg_operands(e) 148 else: 149 projections.append(e) 150 151 where = expression.args.get("where") 152 153 if where: 154 step.condition = where.this 155 156 group = expression.args.get("group") 157 158 if group or aggregations: 159 aggregate = Aggregate() 160 aggregate.source = step.name 161 aggregate.name = step.name 162 163 having = expression.args.get("having") 164 165 if having: 166 extract_agg_operands(having) 167 aggregate.condition = having.this 168 169 aggregate.operands = tuple( 170 alias(operand, alias_) for operand, alias_ in operands.items() 171 ) 172 aggregate.aggregations = aggregations 173 # give aggregates names and replace projections with references to them 174 aggregate.group = { 175 f"_g{i}": e for i, e in enumerate(group.expressions if group else []) 176 } 177 for projection in projections: 178 for i, e in aggregate.group.items(): 179 for child, _, _ in projection.walk(): 180 if child == e: 181 child.replace(exp.column(i, step.name)) 182 aggregate.add_dependency(step) 183 step = aggregate 184 185 order = expression.args.get("order") 186 187 if order: 188 sort = Sort() 189 sort.name = step.name 190 sort.key = order.expressions 191 sort.add_dependency(step) 192 step = sort 193 194 step.projections = projections 195 196 if isinstance(expression, exp.Select) and expression.args.get("distinct"): 197 distinct = Aggregate() 198 distinct.source = step.name 199 distinct.name = step.name 200 distinct.group = { 201 e.alias_or_name: exp.column(col=e.alias_or_name, table=step.name) 202 for e in projections or expression.expressions 203 } 204 distinct.add_dependency(step) 205 step = distinct 206 207 limit = expression.args.get("limit") 208 209 if limit: 210 step.limit = int(limit.text("expression")) 211 212 return step 213 214 def __init__(self) -> None: 215 self.name: t.Optional[str] = None 216 self.dependencies: t.Set[Step] = set() 217 self.dependents: t.Set[Step] = set() 218 self.projections: t.Sequence[exp.Expression] = [] 219 self.limit: float = math.inf 220 self.condition: t.Optional[exp.Expression] = None 221 222 def add_dependency(self, dependency: Step) -> None: 223 self.dependencies.add(dependency) 224 dependency.dependents.add(self) 225 226 def __repr__(self) -> str: 227 return self.to_s() 228 229 def to_s(self, level: int = 0) -> str: 230 indent = " " * level 231 nested = f"{indent} " 232 233 context = self._to_s(f"{nested} ") 234 235 if context: 236 context = [f"{nested}Context:"] + context 237 238 lines = [ 239 f"{indent}- {self.id}", 240 *context, 241 f"{nested}Projections:", 242 ] 243 244 for expression in self.projections: 245 lines.append(f"{nested} - {expression.sql()}") 246 247 if self.condition: 248 lines.append(f"{nested}Condition: {self.condition.sql()}") 249 250 if self.limit is not math.inf: 251 lines.append(f"{nested}Limit: {self.limit}") 252 253 if self.dependencies: 254 lines.append(f"{nested}Dependencies:") 255 for dependency in self.dependencies: 256 lines.append(" " + dependency.to_s(level + 1)) 257 258 return "\n".join(lines) 259 260 @property 261 def type_name(self) -> str: 262 return self.__class__.__name__ 263 264 @property 265 def id(self) -> str: 266 name = self.name 267 name = f" {name}" if name else "" 268 return f"{self.type_name}:{name} ({id(self)})" 269 270 def _to_s(self, _indent: str) -> t.List[str]: 271 return []
44 @classmethod 45 def from_expression( 46 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 47 ) -> Step: 48 """ 49 Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. 50 Note: the expression's tables and subqueries must be aliased for this method to work. For 51 example, given the following expression: 52 53 SELECT 54 x.a, 55 SUM(x.b) 56 FROM x AS x 57 JOIN y AS y 58 ON x.a = y.a 59 GROUP BY x.a 60 61 the following DAG is produced (the expression IDs might differ per execution): 62 63 - Aggregate: x (4347984624) 64 Context: 65 Aggregations: 66 - SUM(x.b) 67 Group: 68 - x.a 69 Projections: 70 - x.a 71 - "x"."" 72 Dependencies: 73 - Join: x (4347985296) 74 Context: 75 y: 76 On: x.a = y.a 77 Projections: 78 Dependencies: 79 - Scan: x (4347983136) 80 Context: 81 Source: x AS x 82 Projections: 83 - Scan: y (4343416624) 84 Context: 85 Source: y AS y 86 Projections: 87 88 Args: 89 expression: the expression to build the DAG from. 90 ctes: a dictionary that maps CTEs to their corresponding Step DAG by name. 91 92 Returns: 93 A Step DAG corresponding to `expression`. 94 """ 95 ctes = ctes or {} 96 with_ = expression.args.get("with") 97 98 # CTEs break the mold of scope and introduce themselves to all in the context. 99 if with_: 100 ctes = ctes.copy() 101 for cte in with_.expressions: 102 step = Step.from_expression(cte.this, ctes) 103 step.name = cte.alias 104 ctes[step.name] = step # type: ignore 105 106 from_ = expression.args.get("from") 107 108 if isinstance(expression, exp.Select) and from_: 109 from_ = from_.expressions 110 if len(from_) > 1: 111 raise UnsupportedError( 112 "Multi-from statements are unsupported. Run it through the optimizer" 113 ) 114 115 step = Scan.from_expression(from_[0], ctes) 116 elif isinstance(expression, exp.Union): 117 step = SetOperation.from_expression(expression, ctes) 118 else: 119 step = Scan() 120 121 joins = expression.args.get("joins") 122 123 if joins: 124 join = Join.from_joins(joins, ctes) 125 join.name = step.name 126 join.add_dependency(step) 127 step = join 128 129 projections = [] # final selects in this chain of steps representing a select 130 operands = {} # intermediate computations of agg funcs eg x + 1 in SUM(x + 1) 131 aggregations = [] 132 sequence = itertools.count() 133 134 def extract_agg_operands(expression): 135 for agg in expression.find_all(exp.AggFunc): 136 for operand in agg.unnest_operands(): 137 if isinstance(operand, exp.Column): 138 continue 139 if operand not in operands: 140 operands[operand] = f"_a_{next(sequence)}" 141 operand.replace(exp.column(operands[operand], quoted=True)) 142 143 for e in expression.expressions: 144 if e.find(exp.AggFunc): 145 projections.append(exp.column(e.alias_or_name, step.name, quoted=True)) 146 aggregations.append(e) 147 extract_agg_operands(e) 148 else: 149 projections.append(e) 150 151 where = expression.args.get("where") 152 153 if where: 154 step.condition = where.this 155 156 group = expression.args.get("group") 157 158 if group or aggregations: 159 aggregate = Aggregate() 160 aggregate.source = step.name 161 aggregate.name = step.name 162 163 having = expression.args.get("having") 164 165 if having: 166 extract_agg_operands(having) 167 aggregate.condition = having.this 168 169 aggregate.operands = tuple( 170 alias(operand, alias_) for operand, alias_ in operands.items() 171 ) 172 aggregate.aggregations = aggregations 173 # give aggregates names and replace projections with references to them 174 aggregate.group = { 175 f"_g{i}": e for i, e in enumerate(group.expressions if group else []) 176 } 177 for projection in projections: 178 for i, e in aggregate.group.items(): 179 for child, _, _ in projection.walk(): 180 if child == e: 181 child.replace(exp.column(i, step.name)) 182 aggregate.add_dependency(step) 183 step = aggregate 184 185 order = expression.args.get("order") 186 187 if order: 188 sort = Sort() 189 sort.name = step.name 190 sort.key = order.expressions 191 sort.add_dependency(step) 192 step = sort 193 194 step.projections = projections 195 196 if isinstance(expression, exp.Select) and expression.args.get("distinct"): 197 distinct = Aggregate() 198 distinct.source = step.name 199 distinct.name = step.name 200 distinct.group = { 201 e.alias_or_name: exp.column(col=e.alias_or_name, table=step.name) 202 for e in projections or expression.expressions 203 } 204 distinct.add_dependency(step) 205 step = distinct 206 207 limit = expression.args.get("limit") 208 209 if limit: 210 step.limit = int(limit.text("expression")) 211 212 return step
Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. Note: the expression's tables and subqueries must be aliased for this method to work. For example, given the following expression:
SELECT x.a, SUM(x.b) FROM x AS x JOIN y AS y ON x.a = y.a GROUP BY x.a
the following DAG is produced (the expression IDs might differ per execution):
- Aggregate: x (4347984624)
Context:
Aggregations:
- SUM(x.b)
Group:
- x.a
Projections:
- x.a
- "x".""
Dependencies:
- Join: x (4347985296) Context: y: On: x.a = y.a Projections: Dependencies:
- Scan: x (4347983136) Context: Source: x AS x Projections:
- Scan: y (4343416624) Context: Source: y AS y Projections:
Arguments:
- expression: the expression to build the DAG from.
- ctes: a dictionary that maps CTEs to their corresponding Step DAG by name.
Returns:
A Step DAG corresponding to
expression
.
229 def to_s(self, level: int = 0) -> str: 230 indent = " " * level 231 nested = f"{indent} " 232 233 context = self._to_s(f"{nested} ") 234 235 if context: 236 context = [f"{nested}Context:"] + context 237 238 lines = [ 239 f"{indent}- {self.id}", 240 *context, 241 f"{nested}Projections:", 242 ] 243 244 for expression in self.projections: 245 lines.append(f"{nested} - {expression.sql()}") 246 247 if self.condition: 248 lines.append(f"{nested}Condition: {self.condition.sql()}") 249 250 if self.limit is not math.inf: 251 lines.append(f"{nested}Limit: {self.limit}") 252 253 if self.dependencies: 254 lines.append(f"{nested}Dependencies:") 255 for dependency in self.dependencies: 256 lines.append(" " + dependency.to_s(level + 1)) 257 258 return "\n".join(lines)
274class Scan(Step): 275 @classmethod 276 def from_expression( 277 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 278 ) -> Step: 279 table = expression 280 alias_ = expression.alias_or_name 281 282 if isinstance(expression, exp.Subquery): 283 table = expression.this 284 step = Step.from_expression(table, ctes) 285 step.name = alias_ 286 return step 287 288 step = Scan() 289 step.name = alias_ 290 step.source = expression 291 if ctes and table.name in ctes: 292 step.add_dependency(ctes[table.name]) 293 294 return step 295 296 def __init__(self) -> None: 297 super().__init__() 298 self.source: t.Optional[exp.Expression] = None 299 300 def _to_s(self, indent: str) -> t.List[str]: 301 return [f"{indent}Source: {self.source.sql() if self.source else '-static-'}"] # type: ignore
275 @classmethod 276 def from_expression( 277 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 278 ) -> Step: 279 table = expression 280 alias_ = expression.alias_or_name 281 282 if isinstance(expression, exp.Subquery): 283 table = expression.this 284 step = Step.from_expression(table, ctes) 285 step.name = alias_ 286 return step 287 288 step = Scan() 289 step.name = alias_ 290 step.source = expression 291 if ctes and table.name in ctes: 292 step.add_dependency(ctes[table.name]) 293 294 return step
Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. Note: the expression's tables and subqueries must be aliased for this method to work. For example, given the following expression:
SELECT x.a, SUM(x.b) FROM x AS x JOIN y AS y ON x.a = y.a GROUP BY x.a
the following DAG is produced (the expression IDs might differ per execution):
- Aggregate: x (4347984624)
Context:
Aggregations:
- SUM(x.b)
Group:
- x.a
Projections:
- x.a
- "x".""
Dependencies:
- Join: x (4347985296) Context: y: On: x.a = y.a Projections: Dependencies:
- Scan: x (4347983136) Context: Source: x AS x Projections:
- Scan: y (4343416624) Context: Source: y AS y Projections:
Arguments:
- expression: the expression to build the DAG from.
- ctes: a dictionary that maps CTEs to their corresponding Step DAG by name.
Returns:
A Step DAG corresponding to
expression
.
Inherited Members
304class Join(Step): 305 @classmethod 306 def from_joins( 307 cls, joins: t.Iterable[exp.Join], ctes: t.Optional[t.Dict[str, Step]] = None 308 ) -> Step: 309 step = Join() 310 311 for join in joins: 312 source_key, join_key, condition = join_condition(join) 313 step.joins[join.this.alias_or_name] = { 314 "side": join.side, 315 "join_key": join_key, 316 "source_key": source_key, 317 "condition": condition, 318 } 319 320 step.add_dependency(Scan.from_expression(join.this, ctes)) 321 322 return step 323 324 def __init__(self) -> None: 325 super().__init__() 326 self.joins: t.Dict[str, t.Dict[str, t.List[str] | exp.Expression]] = {} 327 328 def _to_s(self, indent: str) -> t.List[str]: 329 lines = [] 330 for name, join in self.joins.items(): 331 lines.append(f"{indent}{name}: {join['side']}") 332 if join.get("condition"): 333 lines.append(f"{indent}On: {join['condition'].sql()}") # type: ignore 334 return lines
305 @classmethod 306 def from_joins( 307 cls, joins: t.Iterable[exp.Join], ctes: t.Optional[t.Dict[str, Step]] = None 308 ) -> Step: 309 step = Join() 310 311 for join in joins: 312 source_key, join_key, condition = join_condition(join) 313 step.joins[join.this.alias_or_name] = { 314 "side": join.side, 315 "join_key": join_key, 316 "source_key": source_key, 317 "condition": condition, 318 } 319 320 step.add_dependency(Scan.from_expression(join.this, ctes)) 321 322 return step
Inherited Members
337class Aggregate(Step): 338 def __init__(self) -> None: 339 super().__init__() 340 self.aggregations: t.List[exp.Expression] = [] 341 self.operands: t.Tuple[exp.Expression, ...] = () 342 self.group: t.Dict[str, exp.Expression] = {} 343 self.source: t.Optional[str] = None 344 345 def _to_s(self, indent: str) -> t.List[str]: 346 lines = [f"{indent}Aggregations:"] 347 348 for expression in self.aggregations: 349 lines.append(f"{indent} - {expression.sql()}") 350 351 if self.group: 352 lines.append(f"{indent}Group:") 353 for expression in self.group.values(): 354 lines.append(f"{indent} - {expression.sql()}") 355 if self.condition: 356 lines.append(f"{indent}Having:") 357 lines.append(f"{indent} - {self.condition.sql()}") 358 if self.operands: 359 lines.append(f"{indent}Operands:") 360 for expression in self.operands: 361 lines.append(f"{indent} - {expression.sql()}") 362 363 return lines
Inherited Members
366class Sort(Step): 367 def __init__(self) -> None: 368 super().__init__() 369 self.key = None 370 371 def _to_s(self, indent: str) -> t.List[str]: 372 lines = [f"{indent}Key:"] 373 374 for expression in self.key: # type: ignore 375 lines.append(f"{indent} - {expression.sql()}") 376 377 return lines
Inherited Members
380class SetOperation(Step): 381 def __init__( 382 self, 383 op: t.Type[exp.Expression], 384 left: str | None, 385 right: str | None, 386 distinct: bool = False, 387 ) -> None: 388 super().__init__() 389 self.op = op 390 self.left = left 391 self.right = right 392 self.distinct = distinct 393 394 @classmethod 395 def from_expression( 396 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 397 ) -> Step: 398 assert isinstance(expression, exp.Union) 399 left = Step.from_expression(expression.left, ctes) 400 right = Step.from_expression(expression.right, ctes) 401 step = cls( 402 op=expression.__class__, 403 left=left.name, 404 right=right.name, 405 distinct=bool(expression.args.get("distinct")), 406 ) 407 step.add_dependency(left) 408 step.add_dependency(right) 409 return step 410 411 def _to_s(self, indent: str) -> t.List[str]: 412 lines = [] 413 if self.distinct: 414 lines.append(f"{indent}Distinct: {self.distinct}") 415 return lines 416 417 @property 418 def type_name(self) -> str: 419 return self.op.__name__
394 @classmethod 395 def from_expression( 396 cls, expression: exp.Expression, ctes: t.Optional[t.Dict[str, Step]] = None 397 ) -> Step: 398 assert isinstance(expression, exp.Union) 399 left = Step.from_expression(expression.left, ctes) 400 right = Step.from_expression(expression.right, ctes) 401 step = cls( 402 op=expression.__class__, 403 left=left.name, 404 right=right.name, 405 distinct=bool(expression.args.get("distinct")), 406 ) 407 step.add_dependency(left) 408 step.add_dependency(right) 409 return step
Builds a DAG of Steps from a SQL expression so that it's easier to execute in an engine. Note: the expression's tables and subqueries must be aliased for this method to work. For example, given the following expression:
SELECT x.a, SUM(x.b) FROM x AS x JOIN y AS y ON x.a = y.a GROUP BY x.a
the following DAG is produced (the expression IDs might differ per execution):
- Aggregate: x (4347984624)
Context:
Aggregations:
- SUM(x.b)
Group:
- x.a
Projections:
- x.a
- "x".""
Dependencies:
- Join: x (4347985296) Context: y: On: x.a = y.a Projections: Dependencies:
- Scan: x (4347983136) Context: Source: x AS x Projections:
- Scan: y (4343416624) Context: Source: y AS y Projections:
Arguments:
- expression: the expression to build the DAG from.
- ctes: a dictionary that maps CTEs to their corresponding Step DAG by name.
Returns:
A Step DAG corresponding to
expression
.