diff options
Diffstat (limited to 'tests/test_lineage.py')
-rw-r--r-- | tests/test_lineage.py | 74 |
1 files changed, 74 insertions, 0 deletions
diff --git a/tests/test_lineage.py b/tests/test_lineage.py index 0fd9da8..25329e2 100644 --- a/tests/test_lineage.py +++ b/tests/test_lineage.py @@ -199,3 +199,77 @@ class TestLineage(unittest.TestCase): "SELECT x FROM (SELECT ax AS x FROM a UNION SELECT bx FROM b UNION SELECT cx FROM c)", ) assert len(node.downstream) == 3 + + def test_lineage_lateral_flatten(self) -> None: + node = lineage( + "VALUE", + "SELECT FLATTENED.VALUE FROM TEST_TABLE, LATERAL FLATTEN(INPUT => RESULT, OUTER => TRUE) FLATTENED", + dialect="snowflake", + ) + self.assertEqual(node.name, "VALUE") + + downstream = node.downstream[0] + self.assertEqual(downstream.name, "FLATTENED.VALUE") + self.assertEqual( + downstream.source.sql(dialect="snowflake"), + "LATERAL FLATTEN(INPUT => TEST_TABLE.RESULT, OUTER => TRUE) AS FLATTENED(SEQ, KEY, PATH, INDEX, VALUE, THIS)", + ) + self.assertEqual( + downstream.expression.sql(dialect="snowflake"), + "VALUE", + ) + self.assertEqual(len(downstream.downstream), 1) + + downstream = downstream.downstream[0] + self.assertEqual(downstream.name, "TEST_TABLE.RESULT") + self.assertEqual(downstream.source.sql(dialect="snowflake"), "TEST_TABLE AS TEST_TABLE") + + def test_subquery(self) -> None: + node = lineage( + "output", + "SELECT (SELECT max(t3.my_column) my_column FROM foo t3) AS output FROM table3", + ) + self.assertEqual(node.name, "SUBQUERY") + node = node.downstream[0] + self.assertEqual(node.name, "my_column") + node = node.downstream[0] + self.assertEqual(node.name, "t3.my_column") + self.assertEqual(node.source.sql(), "foo AS t3") + + def test_lineage_cte_union(self) -> None: + query = """ + WITH dataset AS ( + SELECT * + FROM catalog.db.table_a + + UNION + + SELECT * + FROM catalog.db.table_b + ) + + SELECT x, created_at FROM dataset; + """ + node = lineage("x", query) + + self.assertEqual(node.name, "x") + + downstream_a = node.downstream[0] + self.assertEqual(downstream_a.name, "0") + self.assertEqual(downstream_a.source.sql(), "SELECT * FROM catalog.db.table_a AS table_a") + downstream_b = node.downstream[1] + self.assertEqual(downstream_b.name, "0") + self.assertEqual(downstream_b.source.sql(), "SELECT * FROM catalog.db.table_b AS table_b") + + def test_select_star(self) -> None: + node = lineage("x", "SELECT x from (SELECT * from table_a)") + + self.assertEqual(node.name, "x") + + downstream = node.downstream[0] + self.assertEqual(downstream.name, "_q_0.x") + self.assertEqual(downstream.source.sql(), "SELECT * FROM table_a AS table_a") + + downstream = downstream.downstream[0] + self.assertEqual(downstream.name, "*") + self.assertEqual(downstream.source.sql(), "table_a AS table_a") |