summaryrefslogtreecommitdiffstats
path: root/tests/test_lineage.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_lineage.py')
-rw-r--r--tests/test_lineage.py74
1 files changed, 74 insertions, 0 deletions
diff --git a/tests/test_lineage.py b/tests/test_lineage.py
index 0fd9da8..25329e2 100644
--- a/tests/test_lineage.py
+++ b/tests/test_lineage.py
@@ -199,3 +199,77 @@ class TestLineage(unittest.TestCase):
"SELECT x FROM (SELECT ax AS x FROM a UNION SELECT bx FROM b UNION SELECT cx FROM c)",
)
assert len(node.downstream) == 3
+
+ def test_lineage_lateral_flatten(self) -> None:
+ node = lineage(
+ "VALUE",
+ "SELECT FLATTENED.VALUE FROM TEST_TABLE, LATERAL FLATTEN(INPUT => RESULT, OUTER => TRUE) FLATTENED",
+ dialect="snowflake",
+ )
+ self.assertEqual(node.name, "VALUE")
+
+ downstream = node.downstream[0]
+ self.assertEqual(downstream.name, "FLATTENED.VALUE")
+ self.assertEqual(
+ downstream.source.sql(dialect="snowflake"),
+ "LATERAL FLATTEN(INPUT => TEST_TABLE.RESULT, OUTER => TRUE) AS FLATTENED(SEQ, KEY, PATH, INDEX, VALUE, THIS)",
+ )
+ self.assertEqual(
+ downstream.expression.sql(dialect="snowflake"),
+ "VALUE",
+ )
+ self.assertEqual(len(downstream.downstream), 1)
+
+ downstream = downstream.downstream[0]
+ self.assertEqual(downstream.name, "TEST_TABLE.RESULT")
+ self.assertEqual(downstream.source.sql(dialect="snowflake"), "TEST_TABLE AS TEST_TABLE")
+
+ def test_subquery(self) -> None:
+ node = lineage(
+ "output",
+ "SELECT (SELECT max(t3.my_column) my_column FROM foo t3) AS output FROM table3",
+ )
+ self.assertEqual(node.name, "SUBQUERY")
+ node = node.downstream[0]
+ self.assertEqual(node.name, "my_column")
+ node = node.downstream[0]
+ self.assertEqual(node.name, "t3.my_column")
+ self.assertEqual(node.source.sql(), "foo AS t3")
+
+ def test_lineage_cte_union(self) -> None:
+ query = """
+ WITH dataset AS (
+ SELECT *
+ FROM catalog.db.table_a
+
+ UNION
+
+ SELECT *
+ FROM catalog.db.table_b
+ )
+
+ SELECT x, created_at FROM dataset;
+ """
+ node = lineage("x", query)
+
+ self.assertEqual(node.name, "x")
+
+ downstream_a = node.downstream[0]
+ self.assertEqual(downstream_a.name, "0")
+ self.assertEqual(downstream_a.source.sql(), "SELECT * FROM catalog.db.table_a AS table_a")
+ downstream_b = node.downstream[1]
+ self.assertEqual(downstream_b.name, "0")
+ self.assertEqual(downstream_b.source.sql(), "SELECT * FROM catalog.db.table_b AS table_b")
+
+ def test_select_star(self) -> None:
+ node = lineage("x", "SELECT x from (SELECT * from table_a)")
+
+ self.assertEqual(node.name, "x")
+
+ downstream = node.downstream[0]
+ self.assertEqual(downstream.name, "_q_0.x")
+ self.assertEqual(downstream.source.sql(), "SELECT * FROM table_a AS table_a")
+
+ downstream = downstream.downstream[0]
+ self.assertEqual(downstream.name, "*")
+ self.assertEqual(downstream.source.sql(), "table_a AS table_a")