sqlglot.lineage
1from __future__ import annotations 2 3import json 4import typing as t 5from dataclasses import dataclass, field 6 7from sqlglot import Schema, exp, maybe_parse 8from sqlglot.optimizer import Scope, build_scope, optimize 9from sqlglot.optimizer.expand_laterals import expand_laterals 10from sqlglot.optimizer.qualify_columns import qualify_columns 11from sqlglot.optimizer.qualify_tables import qualify_tables 12 13if t.TYPE_CHECKING: 14 from sqlglot.dialects.dialect import DialectType 15 16 17@dataclass(frozen=True) 18class Node: 19 name: str 20 expression: exp.Expression 21 source: exp.Expression 22 downstream: t.List[Node] = field(default_factory=list) 23 24 def walk(self) -> t.Iterator[Node]: 25 yield self 26 27 for d in self.downstream: 28 if isinstance(d, Node): 29 yield from d.walk() 30 else: 31 yield d 32 33 def to_html(self, **opts) -> LineageHTML: 34 return LineageHTML(self, **opts) 35 36 37def lineage( 38 column: str | exp.Column, 39 sql: str | exp.Expression, 40 schema: t.Optional[t.Dict | Schema] = None, 41 sources: t.Optional[t.Dict[str, str | exp.Subqueryable]] = None, 42 rules: t.Sequence[t.Callable] = (qualify_tables, qualify_columns, expand_laterals), 43 dialect: DialectType = None, 44) -> Node: 45 """Build the lineage graph for a column of a SQL query. 46 47 Args: 48 column: The column to build the lineage for. 49 sql: The SQL string or expression. 50 schema: The schema of tables. 51 sources: A mapping of queries which will be used to continue building lineage. 52 rules: Optimizer rules to apply, by default only qualifying tables and columns. 53 dialect: The dialect of input SQL. 54 55 Returns: 56 A lineage node. 57 """ 58 59 expression = maybe_parse(sql, dialect=dialect) 60 61 if sources: 62 expression = exp.expand( 63 expression, 64 { 65 k: t.cast(exp.Subqueryable, maybe_parse(v, dialect=dialect)) 66 for k, v in sources.items() 67 }, 68 ) 69 70 optimized = optimize(expression, schema=schema, rules=rules) 71 scope = build_scope(optimized) 72 tables: t.Dict[str, Node] = {} 73 74 def to_node( 75 column_name: str, 76 scope: Scope, 77 scope_name: t.Optional[str] = None, 78 upstream: t.Optional[Node] = None, 79 ) -> Node: 80 if isinstance(scope.expression, exp.Union): 81 for scope in scope.union_scopes: 82 node = to_node( 83 column_name, 84 scope=scope, 85 scope_name=scope_name, 86 upstream=upstream, 87 ) 88 return node 89 90 select = next(select for select in scope.selects if select.alias_or_name == column_name) 91 source = optimize(scope.expression.select(select, append=False), schema=schema, rules=rules) 92 select = source.selects[0] 93 94 node = Node( 95 name=f"{scope_name}.{column_name}" if scope_name else column_name, 96 source=source, 97 expression=select, 98 ) 99 100 if upstream: 101 upstream.downstream.append(node) 102 103 for c in set(select.find_all(exp.Column)): 104 table = c.table 105 source = scope.sources[table] 106 107 if isinstance(source, Scope): 108 to_node( 109 c.name, 110 scope=source, 111 scope_name=table, 112 upstream=node, 113 ) 114 else: 115 if table not in tables: 116 tables[table] = Node(name=c.sql(), source=source, expression=source) 117 node.downstream.append(tables[table]) 118 119 return node 120 121 return to_node(column if isinstance(column, str) else column.name, scope) 122 123 124class LineageHTML: 125 """Node to HTML generator using vis.js. 126 127 https://visjs.github.io/vis-network/docs/network/ 128 """ 129 130 def __init__( 131 self, 132 node: Node, 133 dialect: DialectType = None, 134 imports: bool = True, 135 **opts: t.Any, 136 ): 137 self.node = node 138 self.imports = imports 139 140 self.options = { 141 "height": "500px", 142 "width": "100%", 143 "layout": { 144 "hierarchical": { 145 "enabled": True, 146 "nodeSpacing": 200, 147 "sortMethod": "directed", 148 }, 149 }, 150 "interaction": { 151 "dragNodes": False, 152 "selectable": False, 153 }, 154 "physics": { 155 "enabled": False, 156 }, 157 "edges": { 158 "arrows": "to", 159 }, 160 "nodes": { 161 "font": "20px monaco", 162 "shape": "box", 163 "widthConstraint": { 164 "maximum": 300, 165 }, 166 }, 167 **opts, 168 } 169 170 self.nodes = {} 171 self.edges = [] 172 173 for node in node.walk(): 174 if isinstance(node.expression, exp.Table): 175 label = f"FROM {node.expression.this}" 176 title = f"<pre>SELECT {node.name} FROM {node.expression.this}</pre>" 177 group = 1 178 else: 179 label = node.expression.sql(pretty=True, dialect=dialect) 180 source = node.source.transform( 181 lambda n: exp.Tag(this=n, prefix="<b>", postfix="</b>") 182 if n is node.expression 183 else n, 184 copy=False, 185 ).sql(pretty=True, dialect=dialect) 186 title = f"<pre>{source}</pre>" 187 group = 0 188 189 node_id = id(node) 190 191 self.nodes[node_id] = { 192 "id": node_id, 193 "label": label, 194 "title": title, 195 "group": group, 196 } 197 198 for d in node.downstream: 199 self.edges.append({"from": node_id, "to": id(d)}) 200 201 def __str__(self): 202 nodes = json.dumps(list(self.nodes.values())) 203 edges = json.dumps(self.edges) 204 options = json.dumps(self.options) 205 imports = ( 206 """<script type="text/javascript" src="https://unpkg.com/vis-data@latest/peer/umd/vis-data.min.js"></script> 207 <script type="text/javascript" src="https://unpkg.com/vis-network@latest/peer/umd/vis-network.min.js"></script> 208 <link rel="stylesheet" type="text/css" href="https://unpkg.com/vis-network/styles/vis-network.min.css" />""" 209 if self.imports 210 else "" 211 ) 212 213 return f"""<div> 214 <div id="sqlglot-lineage"></div> 215 {imports} 216 <script type="text/javascript"> 217 var nodes = new vis.DataSet({nodes}) 218 nodes.forEach(row => row["title"] = new DOMParser().parseFromString(row["title"], "text/html").body.childNodes[0]) 219 220 new vis.Network( 221 document.getElementById("sqlglot-lineage"), 222 {{ 223 nodes: nodes, 224 edges: new vis.DataSet({edges}) 225 }}, 226 {options}, 227 ) 228 </script> 229</div>""" 230 231 def _repr_html_(self) -> str: 232 return self.__str__()
@dataclass(frozen=True)
class
Node:
18@dataclass(frozen=True) 19class Node: 20 name: str 21 expression: exp.Expression 22 source: exp.Expression 23 downstream: t.List[Node] = field(default_factory=list) 24 25 def walk(self) -> t.Iterator[Node]: 26 yield self 27 28 for d in self.downstream: 29 if isinstance(d, Node): 30 yield from d.walk() 31 else: 32 yield d 33 34 def to_html(self, **opts) -> LineageHTML: 35 return LineageHTML(self, **opts)
Node( name: str, expression: sqlglot.expressions.Expression, source: sqlglot.expressions.Expression, downstream: List[sqlglot.lineage.Node] = <factory>)
def
lineage( column: str | sqlglot.expressions.Column, sql: str | sqlglot.expressions.Expression, schema: Union[Dict, sqlglot.schema.Schema, NoneType] = None, sources: Optional[Dict[str, str | sqlglot.expressions.Subqueryable]] = None, rules: Sequence[Callable] = (<function qualify_tables at 0x7fb8b87be8c0>, <function qualify_columns at 0x7fb8b87bd7e0>, <function expand_laterals at 0x7fb8b87afeb0>), dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None) -> sqlglot.lineage.Node:
38def lineage( 39 column: str | exp.Column, 40 sql: str | exp.Expression, 41 schema: t.Optional[t.Dict | Schema] = None, 42 sources: t.Optional[t.Dict[str, str | exp.Subqueryable]] = None, 43 rules: t.Sequence[t.Callable] = (qualify_tables, qualify_columns, expand_laterals), 44 dialect: DialectType = None, 45) -> Node: 46 """Build the lineage graph for a column of a SQL query. 47 48 Args: 49 column: The column to build the lineage for. 50 sql: The SQL string or expression. 51 schema: The schema of tables. 52 sources: A mapping of queries which will be used to continue building lineage. 53 rules: Optimizer rules to apply, by default only qualifying tables and columns. 54 dialect: The dialect of input SQL. 55 56 Returns: 57 A lineage node. 58 """ 59 60 expression = maybe_parse(sql, dialect=dialect) 61 62 if sources: 63 expression = exp.expand( 64 expression, 65 { 66 k: t.cast(exp.Subqueryable, maybe_parse(v, dialect=dialect)) 67 for k, v in sources.items() 68 }, 69 ) 70 71 optimized = optimize(expression, schema=schema, rules=rules) 72 scope = build_scope(optimized) 73 tables: t.Dict[str, Node] = {} 74 75 def to_node( 76 column_name: str, 77 scope: Scope, 78 scope_name: t.Optional[str] = None, 79 upstream: t.Optional[Node] = None, 80 ) -> Node: 81 if isinstance(scope.expression, exp.Union): 82 for scope in scope.union_scopes: 83 node = to_node( 84 column_name, 85 scope=scope, 86 scope_name=scope_name, 87 upstream=upstream, 88 ) 89 return node 90 91 select = next(select for select in scope.selects if select.alias_or_name == column_name) 92 source = optimize(scope.expression.select(select, append=False), schema=schema, rules=rules) 93 select = source.selects[0] 94 95 node = Node( 96 name=f"{scope_name}.{column_name}" if scope_name else column_name, 97 source=source, 98 expression=select, 99 ) 100 101 if upstream: 102 upstream.downstream.append(node) 103 104 for c in set(select.find_all(exp.Column)): 105 table = c.table 106 source = scope.sources[table] 107 108 if isinstance(source, Scope): 109 to_node( 110 c.name, 111 scope=source, 112 scope_name=table, 113 upstream=node, 114 ) 115 else: 116 if table not in tables: 117 tables[table] = Node(name=c.sql(), source=source, expression=source) 118 node.downstream.append(tables[table]) 119 120 return node 121 122 return to_node(column if isinstance(column, str) else column.name, scope)
Build the lineage graph for a column of a SQL query.
Arguments:
- column: The column to build the lineage for.
- sql: The SQL string or expression.
- schema: The schema of tables.
- sources: A mapping of queries which will be used to continue building lineage.
- rules: Optimizer rules to apply, by default only qualifying tables and columns.
- dialect: The dialect of input SQL.
Returns:
A lineage node.
class
LineageHTML:
125class LineageHTML: 126 """Node to HTML generator using vis.js. 127 128 https://visjs.github.io/vis-network/docs/network/ 129 """ 130 131 def __init__( 132 self, 133 node: Node, 134 dialect: DialectType = None, 135 imports: bool = True, 136 **opts: t.Any, 137 ): 138 self.node = node 139 self.imports = imports 140 141 self.options = { 142 "height": "500px", 143 "width": "100%", 144 "layout": { 145 "hierarchical": { 146 "enabled": True, 147 "nodeSpacing": 200, 148 "sortMethod": "directed", 149 }, 150 }, 151 "interaction": { 152 "dragNodes": False, 153 "selectable": False, 154 }, 155 "physics": { 156 "enabled": False, 157 }, 158 "edges": { 159 "arrows": "to", 160 }, 161 "nodes": { 162 "font": "20px monaco", 163 "shape": "box", 164 "widthConstraint": { 165 "maximum": 300, 166 }, 167 }, 168 **opts, 169 } 170 171 self.nodes = {} 172 self.edges = [] 173 174 for node in node.walk(): 175 if isinstance(node.expression, exp.Table): 176 label = f"FROM {node.expression.this}" 177 title = f"<pre>SELECT {node.name} FROM {node.expression.this}</pre>" 178 group = 1 179 else: 180 label = node.expression.sql(pretty=True, dialect=dialect) 181 source = node.source.transform( 182 lambda n: exp.Tag(this=n, prefix="<b>", postfix="</b>") 183 if n is node.expression 184 else n, 185 copy=False, 186 ).sql(pretty=True, dialect=dialect) 187 title = f"<pre>{source}</pre>" 188 group = 0 189 190 node_id = id(node) 191 192 self.nodes[node_id] = { 193 "id": node_id, 194 "label": label, 195 "title": title, 196 "group": group, 197 } 198 199 for d in node.downstream: 200 self.edges.append({"from": node_id, "to": id(d)}) 201 202 def __str__(self): 203 nodes = json.dumps(list(self.nodes.values())) 204 edges = json.dumps(self.edges) 205 options = json.dumps(self.options) 206 imports = ( 207 """<script type="text/javascript" src="https://unpkg.com/vis-data@latest/peer/umd/vis-data.min.js"></script> 208 <script type="text/javascript" src="https://unpkg.com/vis-network@latest/peer/umd/vis-network.min.js"></script> 209 <link rel="stylesheet" type="text/css" href="https://unpkg.com/vis-network/styles/vis-network.min.css" />""" 210 if self.imports 211 else "" 212 ) 213 214 return f"""<div> 215 <div id="sqlglot-lineage"></div> 216 {imports} 217 <script type="text/javascript"> 218 var nodes = new vis.DataSet({nodes}) 219 nodes.forEach(row => row["title"] = new DOMParser().parseFromString(row["title"], "text/html").body.childNodes[0]) 220 221 new vis.Network( 222 document.getElementById("sqlglot-lineage"), 223 {{ 224 nodes: nodes, 225 edges: new vis.DataSet({edges}) 226 }}, 227 {options}, 228 ) 229 </script> 230</div>""" 231 232 def _repr_html_(self) -> str: 233 return self.__str__()
Node to HTML generator using vis.js.
LineageHTML( node: sqlglot.lineage.Node, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None, imports: bool = True, **opts: Any)
131 def __init__( 132 self, 133 node: Node, 134 dialect: DialectType = None, 135 imports: bool = True, 136 **opts: t.Any, 137 ): 138 self.node = node 139 self.imports = imports 140 141 self.options = { 142 "height": "500px", 143 "width": "100%", 144 "layout": { 145 "hierarchical": { 146 "enabled": True, 147 "nodeSpacing": 200, 148 "sortMethod": "directed", 149 }, 150 }, 151 "interaction": { 152 "dragNodes": False, 153 "selectable": False, 154 }, 155 "physics": { 156 "enabled": False, 157 }, 158 "edges": { 159 "arrows": "to", 160 }, 161 "nodes": { 162 "font": "20px monaco", 163 "shape": "box", 164 "widthConstraint": { 165 "maximum": 300, 166 }, 167 }, 168 **opts, 169 } 170 171 self.nodes = {} 172 self.edges = [] 173 174 for node in node.walk(): 175 if isinstance(node.expression, exp.Table): 176 label = f"FROM {node.expression.this}" 177 title = f"<pre>SELECT {node.name} FROM {node.expression.this}</pre>" 178 group = 1 179 else: 180 label = node.expression.sql(pretty=True, dialect=dialect) 181 source = node.source.transform( 182 lambda n: exp.Tag(this=n, prefix="<b>", postfix="</b>") 183 if n is node.expression 184 else n, 185 copy=False, 186 ).sql(pretty=True, dialect=dialect) 187 title = f"<pre>{source}</pre>" 188 group = 0 189 190 node_id = id(node) 191 192 self.nodes[node_id] = { 193 "id": node_id, 194 "label": label, 195 "title": title, 196 "group": group, 197 } 198 199 for d in node.downstream: 200 self.edges.append({"from": node_id, "to": id(d)})