Edit on GitHub

sqlglot.optimizer.qualify_tables

 1import itertools
 2
 3from sqlglot import alias, exp
 4from sqlglot.helper import csv_reader
 5from sqlglot.optimizer.scope import Scope, traverse_scope
 6
 7
 8def qualify_tables(expression, db=None, catalog=None, schema=None):
 9    """
10    Rewrite sqlglot AST to have fully qualified tables.
11
12    Example:
13        >>> import sqlglot
14        >>> expression = sqlglot.parse_one("SELECT 1 FROM tbl")
15        >>> qualify_tables(expression, db="db").sql()
16        'SELECT 1 FROM db.tbl AS tbl'
17
18    Args:
19        expression (sqlglot.Expression): expression to qualify
20        db (str): Database name
21        catalog (str): Catalog name
22        schema: A schema to populate
23    Returns:
24        sqlglot.Expression: qualified expression
25    """
26    sequence = itertools.count()
27
28    next_name = lambda: f"_q_{next(sequence)}"
29
30    for scope in traverse_scope(expression):
31        for derived_table in itertools.chain(scope.ctes, scope.derived_tables):
32            if not derived_table.args.get("alias"):
33                alias_ = f"_q_{next(sequence)}"
34                derived_table.set("alias", exp.TableAlias(this=exp.to_identifier(alias_)))
35                scope.rename_source(None, alias_)
36
37        for source in scope.sources.values():
38            if isinstance(source, exp.Table):
39                identifier = isinstance(source.this, exp.Identifier)
40
41                if identifier:
42                    if not source.args.get("db"):
43                        source.set("db", exp.to_identifier(db))
44                    if not source.args.get("catalog"):
45                        source.set("catalog", exp.to_identifier(catalog))
46
47                if not source.alias:
48                    source = source.replace(
49                        alias(
50                            source.copy(),
51                            source.this if identifier else next_name(),
52                            table=True,
53                        )
54                    )
55
56                if schema and isinstance(source.this, exp.ReadCSV):
57                    with csv_reader(source.this) as reader:
58                        header = next(reader)
59                        columns = next(reader)
60                        schema.add_table(
61                            source, {k: type(v).__name__ for k, v in zip(header, columns)}
62                        )
63            elif isinstance(source, Scope) and source.is_udtf:
64                udtf = source.expression
65                table_alias = udtf.args.get("alias") or exp.TableAlias(this=next_name())
66                udtf.set("alias", table_alias)
67
68                if not table_alias.name:
69                    table_alias.set("this", next_name())
70
71    return expression
def qualify_tables(expression, db=None, catalog=None, schema=None):
 9def qualify_tables(expression, db=None, catalog=None, schema=None):
10    """
11    Rewrite sqlglot AST to have fully qualified tables.
12
13    Example:
14        >>> import sqlglot
15        >>> expression = sqlglot.parse_one("SELECT 1 FROM tbl")
16        >>> qualify_tables(expression, db="db").sql()
17        'SELECT 1 FROM db.tbl AS tbl'
18
19    Args:
20        expression (sqlglot.Expression): expression to qualify
21        db (str): Database name
22        catalog (str): Catalog name
23        schema: A schema to populate
24    Returns:
25        sqlglot.Expression: qualified expression
26    """
27    sequence = itertools.count()
28
29    next_name = lambda: f"_q_{next(sequence)}"
30
31    for scope in traverse_scope(expression):
32        for derived_table in itertools.chain(scope.ctes, scope.derived_tables):
33            if not derived_table.args.get("alias"):
34                alias_ = f"_q_{next(sequence)}"
35                derived_table.set("alias", exp.TableAlias(this=exp.to_identifier(alias_)))
36                scope.rename_source(None, alias_)
37
38        for source in scope.sources.values():
39            if isinstance(source, exp.Table):
40                identifier = isinstance(source.this, exp.Identifier)
41
42                if identifier:
43                    if not source.args.get("db"):
44                        source.set("db", exp.to_identifier(db))
45                    if not source.args.get("catalog"):
46                        source.set("catalog", exp.to_identifier(catalog))
47
48                if not source.alias:
49                    source = source.replace(
50                        alias(
51                            source.copy(),
52                            source.this if identifier else next_name(),
53                            table=True,
54                        )
55                    )
56
57                if schema and isinstance(source.this, exp.ReadCSV):
58                    with csv_reader(source.this) as reader:
59                        header = next(reader)
60                        columns = next(reader)
61                        schema.add_table(
62                            source, {k: type(v).__name__ for k, v in zip(header, columns)}
63                        )
64            elif isinstance(source, Scope) and source.is_udtf:
65                udtf = source.expression
66                table_alias = udtf.args.get("alias") or exp.TableAlias(this=next_name())
67                udtf.set("alias", table_alias)
68
69                if not table_alias.name:
70                    table_alias.set("this", next_name())
71
72    return expression

Rewrite sqlglot AST to have fully qualified tables.

Example:
>>> import sqlglot
>>> expression = sqlglot.parse_one("SELECT 1 FROM tbl")
>>> qualify_tables(expression, db="db").sql()
'SELECT 1 FROM db.tbl AS tbl'
Arguments:
  • expression (sqlglot.Expression): expression to qualify
  • db (str): Database name
  • catalog (str): Catalog name
  • schema: A schema to populate
Returns:

sqlglot.Expression: qualified expression