sqlglot.optimizer.qualify_tables
1import itertools 2 3from sqlglot import alias, exp 4from sqlglot.helper import csv_reader 5from sqlglot.optimizer.scope import Scope, traverse_scope 6 7 8def qualify_tables(expression, db=None, catalog=None, schema=None): 9 """ 10 Rewrite sqlglot AST to have fully qualified tables. 11 12 Example: 13 >>> import sqlglot 14 >>> expression = sqlglot.parse_one("SELECT 1 FROM tbl") 15 >>> qualify_tables(expression, db="db").sql() 16 'SELECT 1 FROM db.tbl AS tbl' 17 18 Args: 19 expression (sqlglot.Expression): expression to qualify 20 db (str): Database name 21 catalog (str): Catalog name 22 schema: A schema to populate 23 Returns: 24 sqlglot.Expression: qualified expression 25 """ 26 sequence = itertools.count() 27 28 next_name = lambda: f"_q_{next(sequence)}" 29 30 for scope in traverse_scope(expression): 31 for derived_table in itertools.chain(scope.ctes, scope.derived_tables): 32 if not derived_table.args.get("alias"): 33 alias_ = f"_q_{next(sequence)}" 34 derived_table.set("alias", exp.TableAlias(this=exp.to_identifier(alias_))) 35 scope.rename_source(None, alias_) 36 37 for source in scope.sources.values(): 38 if isinstance(source, exp.Table): 39 identifier = isinstance(source.this, exp.Identifier) 40 41 if identifier: 42 if not source.args.get("db"): 43 source.set("db", exp.to_identifier(db)) 44 if not source.args.get("catalog"): 45 source.set("catalog", exp.to_identifier(catalog)) 46 47 if not source.alias: 48 source = source.replace( 49 alias( 50 source.copy(), 51 source.this if identifier else next_name(), 52 table=True, 53 ) 54 ) 55 56 if schema and isinstance(source.this, exp.ReadCSV): 57 with csv_reader(source.this) as reader: 58 header = next(reader) 59 columns = next(reader) 60 schema.add_table( 61 source, {k: type(v).__name__ for k, v in zip(header, columns)} 62 ) 63 elif isinstance(source, Scope) and source.is_udtf: 64 udtf = source.expression 65 table_alias = udtf.args.get("alias") or exp.TableAlias(this=next_name()) 66 udtf.set("alias", table_alias) 67 68 if not table_alias.name: 69 table_alias.set("this", next_name()) 70 71 return expression
def
qualify_tables(expression, db=None, catalog=None, schema=None):
9def qualify_tables(expression, db=None, catalog=None, schema=None): 10 """ 11 Rewrite sqlglot AST to have fully qualified tables. 12 13 Example: 14 >>> import sqlglot 15 >>> expression = sqlglot.parse_one("SELECT 1 FROM tbl") 16 >>> qualify_tables(expression, db="db").sql() 17 'SELECT 1 FROM db.tbl AS tbl' 18 19 Args: 20 expression (sqlglot.Expression): expression to qualify 21 db (str): Database name 22 catalog (str): Catalog name 23 schema: A schema to populate 24 Returns: 25 sqlglot.Expression: qualified expression 26 """ 27 sequence = itertools.count() 28 29 next_name = lambda: f"_q_{next(sequence)}" 30 31 for scope in traverse_scope(expression): 32 for derived_table in itertools.chain(scope.ctes, scope.derived_tables): 33 if not derived_table.args.get("alias"): 34 alias_ = f"_q_{next(sequence)}" 35 derived_table.set("alias", exp.TableAlias(this=exp.to_identifier(alias_))) 36 scope.rename_source(None, alias_) 37 38 for source in scope.sources.values(): 39 if isinstance(source, exp.Table): 40 identifier = isinstance(source.this, exp.Identifier) 41 42 if identifier: 43 if not source.args.get("db"): 44 source.set("db", exp.to_identifier(db)) 45 if not source.args.get("catalog"): 46 source.set("catalog", exp.to_identifier(catalog)) 47 48 if not source.alias: 49 source = source.replace( 50 alias( 51 source.copy(), 52 source.this if identifier else next_name(), 53 table=True, 54 ) 55 ) 56 57 if schema and isinstance(source.this, exp.ReadCSV): 58 with csv_reader(source.this) as reader: 59 header = next(reader) 60 columns = next(reader) 61 schema.add_table( 62 source, {k: type(v).__name__ for k, v in zip(header, columns)} 63 ) 64 elif isinstance(source, Scope) and source.is_udtf: 65 udtf = source.expression 66 table_alias = udtf.args.get("alias") or exp.TableAlias(this=next_name()) 67 udtf.set("alias", table_alias) 68 69 if not table_alias.name: 70 table_alias.set("this", next_name()) 71 72 return expression
Rewrite sqlglot AST to have fully qualified tables.
Example:
>>> import sqlglot >>> expression = sqlglot.parse_one("SELECT 1 FROM tbl") >>> qualify_tables(expression, db="db").sql() 'SELECT 1 FROM db.tbl AS tbl'
Arguments:
- expression (sqlglot.Expression): expression to qualify
- db (str): Database name
- catalog (str): Catalog name
- schema: A schema to populate
Returns:
sqlglot.Expression: qualified expression