summaryrefslogtreecommitdiffstats
path: root/tests/test_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_async.py')
-rw-r--r--tests/test_async.py660
1 files changed, 660 insertions, 0 deletions
diff --git a/tests/test_async.py b/tests/test_async.py
new file mode 100644
index 0000000..c9ba70c
--- /dev/null
+++ b/tests/test_async.py
@@ -0,0 +1,660 @@
+import asyncio
+
+import pytest
+
+from jinja2 import ChainableUndefined
+from jinja2 import DictLoader
+from jinja2 import Environment
+from jinja2 import Template
+from jinja2.async_utils import auto_aiter
+from jinja2.exceptions import TemplateNotFound
+from jinja2.exceptions import TemplatesNotFound
+from jinja2.exceptions import UndefinedError
+from jinja2.nativetypes import NativeEnvironment
+
+
+def test_basic_async():
+ t = Template(
+ "{% for item in [1, 2, 3] %}[{{ item }}]{% endfor %}", enable_async=True
+ )
+
+ async def func():
+ return await t.render_async()
+
+ rv = asyncio.run(func())
+ assert rv == "[1][2][3]"
+
+
+def test_await_on_calls():
+ t = Template("{{ async_func() + normal_func() }}", enable_async=True)
+
+ async def async_func():
+ return 42
+
+ def normal_func():
+ return 23
+
+ async def func():
+ return await t.render_async(async_func=async_func, normal_func=normal_func)
+
+ rv = asyncio.run(func())
+ assert rv == "65"
+
+
+def test_await_on_calls_normal_render():
+ t = Template("{{ async_func() + normal_func() }}", enable_async=True)
+
+ async def async_func():
+ return 42
+
+ def normal_func():
+ return 23
+
+ rv = t.render(async_func=async_func, normal_func=normal_func)
+ assert rv == "65"
+
+
+def test_await_and_macros():
+ t = Template(
+ "{% macro foo(x) %}[{{ x }}][{{ async_func() }}]{% endmacro %}{{ foo(42) }}",
+ enable_async=True,
+ )
+
+ async def async_func():
+ return 42
+
+ async def func():
+ return await t.render_async(async_func=async_func)
+
+ rv = asyncio.run(func())
+ assert rv == "[42][42]"
+
+
+def test_async_blocks():
+ t = Template(
+ "{% block foo %}<Test>{% endblock %}{{ self.foo() }}",
+ enable_async=True,
+ autoescape=True,
+ )
+
+ async def func():
+ return await t.render_async()
+
+ rv = asyncio.run(func())
+ assert rv == "<Test><Test>"
+
+
+def test_async_generate():
+ t = Template("{% for x in [1, 2, 3] %}{{ x }}{% endfor %}", enable_async=True)
+ rv = list(t.generate())
+ assert rv == ["1", "2", "3"]
+
+
+def test_async_iteration_in_templates():
+ t = Template("{% for x in rng %}{{ x }}{% endfor %}", enable_async=True)
+
+ async def async_iterator():
+ for item in [1, 2, 3]:
+ yield item
+
+ rv = list(t.generate(rng=async_iterator()))
+ assert rv == ["1", "2", "3"]
+
+
+def test_async_iteration_in_templates_extended():
+ t = Template(
+ "{% for x in rng %}{{ loop.index0 }}/{{ x }}{% endfor %}", enable_async=True
+ )
+ stream = t.generate(rng=auto_aiter(range(1, 4)))
+ assert next(stream) == "0"
+ assert "".join(stream) == "/11/22/3"
+
+
+@pytest.fixture
+def test_env_async():
+ env = Environment(
+ loader=DictLoader(
+ dict(
+ module="{% macro test() %}[{{ foo }}|{{ bar }}]{% endmacro %}",
+ header="[{{ foo }}|{{ 23 }}]",
+ o_printer="({{ o }})",
+ )
+ ),
+ enable_async=True,
+ )
+ env.globals["bar"] = 23
+ return env
+
+
+class TestAsyncImports:
+ def test_context_imports(self, test_env_async):
+ t = test_env_async.from_string('{% import "module" as m %}{{ m.test() }}')
+ assert t.render(foo=42) == "[|23]"
+ t = test_env_async.from_string(
+ '{% import "module" as m without context %}{{ m.test() }}'
+ )
+ assert t.render(foo=42) == "[|23]"
+ t = test_env_async.from_string(
+ '{% import "module" as m with context %}{{ m.test() }}'
+ )
+ assert t.render(foo=42) == "[42|23]"
+ t = test_env_async.from_string('{% from "module" import test %}{{ test() }}')
+ assert t.render(foo=42) == "[|23]"
+ t = test_env_async.from_string(
+ '{% from "module" import test without context %}{{ test() }}'
+ )
+ assert t.render(foo=42) == "[|23]"
+ t = test_env_async.from_string(
+ '{% from "module" import test with context %}{{ test() }}'
+ )
+ assert t.render(foo=42) == "[42|23]"
+
+ def test_trailing_comma(self, test_env_async):
+ test_env_async.from_string('{% from "foo" import bar, baz with context %}')
+ test_env_async.from_string('{% from "foo" import bar, baz, with context %}')
+ test_env_async.from_string('{% from "foo" import bar, with context %}')
+ test_env_async.from_string('{% from "foo" import bar, with, context %}')
+ test_env_async.from_string('{% from "foo" import bar, with with context %}')
+
+ def test_exports(self, test_env_async):
+ coro = test_env_async.from_string(
+ """
+ {% macro toplevel() %}...{% endmacro %}
+ {% macro __private() %}...{% endmacro %}
+ {% set variable = 42 %}
+ {% for item in [1] %}
+ {% macro notthere() %}{% endmacro %}
+ {% endfor %}
+ """
+ )._get_default_module_async()
+ m = asyncio.run(coro)
+ assert asyncio.run(m.toplevel()) == "..."
+ assert not hasattr(m, "__missing")
+ assert m.variable == 42
+ assert not hasattr(m, "notthere")
+
+ def test_import_with_globals(self, test_env_async):
+ t = test_env_async.from_string(
+ '{% import "module" as m %}{{ m.test() }}', globals={"foo": 42}
+ )
+ assert t.render() == "[42|23]"
+
+ t = test_env_async.from_string('{% import "module" as m %}{{ m.test() }}')
+ assert t.render() == "[|23]"
+
+ def test_import_with_globals_override(self, test_env_async):
+ t = test_env_async.from_string(
+ '{% set foo = 41 %}{% import "module" as m %}{{ m.test() }}',
+ globals={"foo": 42},
+ )
+ assert t.render() == "[42|23]"
+
+ def test_from_import_with_globals(self, test_env_async):
+ t = test_env_async.from_string(
+ '{% from "module" import test %}{{ test() }}',
+ globals={"foo": 42},
+ )
+ assert t.render() == "[42|23]"
+
+
+class TestAsyncIncludes:
+ def test_context_include(self, test_env_async):
+ t = test_env_async.from_string('{% include "header" %}')
+ assert t.render(foo=42) == "[42|23]"
+ t = test_env_async.from_string('{% include "header" with context %}')
+ assert t.render(foo=42) == "[42|23]"
+ t = test_env_async.from_string('{% include "header" without context %}')
+ assert t.render(foo=42) == "[|23]"
+
+ def test_choice_includes(self, test_env_async):
+ t = test_env_async.from_string('{% include ["missing", "header"] %}')
+ assert t.render(foo=42) == "[42|23]"
+
+ t = test_env_async.from_string(
+ '{% include ["missing", "missing2"] ignore missing %}'
+ )
+ assert t.render(foo=42) == ""
+
+ t = test_env_async.from_string('{% include ["missing", "missing2"] %}')
+ pytest.raises(TemplateNotFound, t.render)
+ with pytest.raises(TemplatesNotFound) as e:
+ t.render()
+
+ assert e.value.templates == ["missing", "missing2"]
+ assert e.value.name == "missing2"
+
+ def test_includes(t, **ctx):
+ ctx["foo"] = 42
+ assert t.render(ctx) == "[42|23]"
+
+ t = test_env_async.from_string('{% include ["missing", "header"] %}')
+ test_includes(t)
+ t = test_env_async.from_string("{% include x %}")
+ test_includes(t, x=["missing", "header"])
+ t = test_env_async.from_string('{% include [x, "header"] %}')
+ test_includes(t, x="missing")
+ t = test_env_async.from_string("{% include x %}")
+ test_includes(t, x="header")
+ t = test_env_async.from_string("{% include x %}")
+ test_includes(t, x="header")
+ t = test_env_async.from_string("{% include [x] %}")
+ test_includes(t, x="header")
+
+ def test_include_ignoring_missing(self, test_env_async):
+ t = test_env_async.from_string('{% include "missing" %}')
+ pytest.raises(TemplateNotFound, t.render)
+ for extra in "", "with context", "without context":
+ t = test_env_async.from_string(
+ '{% include "missing" ignore missing ' + extra + " %}"
+ )
+ assert t.render() == ""
+
+ def test_context_include_with_overrides(self, test_env_async):
+ env = Environment(
+ loader=DictLoader(
+ dict(
+ main="{% for item in [1, 2, 3] %}{% include 'item' %}{% endfor %}",
+ item="{{ item }}",
+ )
+ )
+ )
+ assert env.get_template("main").render() == "123"
+
+ def test_unoptimized_scopes(self, test_env_async):
+ t = test_env_async.from_string(
+ """
+ {% macro outer(o) %}
+ {% macro inner() %}
+ {% include "o_printer" %}
+ {% endmacro %}
+ {{ inner() }}
+ {% endmacro %}
+ {{ outer("FOO") }}
+ """
+ )
+ assert t.render().strip() == "(FOO)"
+
+ def test_unoptimized_scopes_autoescape(self):
+ env = Environment(
+ loader=DictLoader({"o_printer": "({{ o }})"}),
+ autoescape=True,
+ enable_async=True,
+ )
+ t = env.from_string(
+ """
+ {% macro outer(o) %}
+ {% macro inner() %}
+ {% include "o_printer" %}
+ {% endmacro %}
+ {{ inner() }}
+ {% endmacro %}
+ {{ outer("FOO") }}
+ """
+ )
+ assert t.render().strip() == "(FOO)"
+
+
+class TestAsyncForLoop:
+ def test_simple(self, test_env_async):
+ tmpl = test_env_async.from_string("{% for item in seq %}{{ item }}{% endfor %}")
+ assert tmpl.render(seq=list(range(10))) == "0123456789"
+
+ def test_else(self, test_env_async):
+ tmpl = test_env_async.from_string(
+ "{% for item in seq %}XXX{% else %}...{% endfor %}"
+ )
+ assert tmpl.render() == "..."
+
+ def test_empty_blocks(self, test_env_async):
+ tmpl = test_env_async.from_string(
+ "<{% for item in seq %}{% else %}{% endfor %}>"
+ )
+ assert tmpl.render() == "<>"
+
+ @pytest.mark.parametrize(
+ "transform", [lambda x: x, iter, reversed, lambda x: (i for i in x), auto_aiter]
+ )
+ def test_context_vars(self, test_env_async, transform):
+ t = test_env_async.from_string(
+ "{% for item in seq %}{{ loop.index }}|{{ loop.index0 }}"
+ "|{{ loop.revindex }}|{{ loop.revindex0 }}|{{ loop.first }}"
+ "|{{ loop.last }}|{{ loop.length }}\n{% endfor %}"
+ )
+ out = t.render(seq=transform([42, 24]))
+ assert out == "1|0|2|1|True|False|2\n2|1|1|0|False|True|2\n"
+
+ def test_cycling(self, test_env_async):
+ tmpl = test_env_async.from_string(
+ """{% for item in seq %}{{
+ loop.cycle('<1>', '<2>') }}{% endfor %}{%
+ for item in seq %}{{ loop.cycle(*through) }}{% endfor %}"""
+ )
+ output = tmpl.render(seq=list(range(4)), through=("<1>", "<2>"))
+ assert output == "<1><2>" * 4
+
+ def test_lookaround(self, test_env_async):
+ tmpl = test_env_async.from_string(
+ """{% for item in seq -%}
+ {{ loop.previtem|default('x') }}-{{ item }}-{{
+ loop.nextitem|default('x') }}|
+ {%- endfor %}"""
+ )
+ output = tmpl.render(seq=list(range(4)))
+ assert output == "x-0-1|0-1-2|1-2-3|2-3-x|"
+
+ def test_changed(self, test_env_async):
+ tmpl = test_env_async.from_string(
+ """{% for item in seq -%}
+ {{ loop.changed(item) }},
+ {%- endfor %}"""
+ )
+ output = tmpl.render(seq=[None, None, 1, 2, 2, 3, 4, 4, 4])
+ assert output == "True,False,True,True,False,True,True,False,False,"
+
+ def test_scope(self, test_env_async):
+ tmpl = test_env_async.from_string("{% for item in seq %}{% endfor %}{{ item }}")
+ output = tmpl.render(seq=list(range(10)))
+ assert not output
+
+ def test_varlen(self, test_env_async):
+ def inner():
+ yield from range(5)
+
+ tmpl = test_env_async.from_string(
+ "{% for item in iter %}{{ item }}{% endfor %}"
+ )
+ output = tmpl.render(iter=inner())
+ assert output == "01234"
+
+ def test_noniter(self, test_env_async):
+ tmpl = test_env_async.from_string("{% for item in none %}...{% endfor %}")
+ pytest.raises(TypeError, tmpl.render)
+
+ def test_recursive(self, test_env_async):
+ tmpl = test_env_async.from_string(
+ """{% for item in seq recursive -%}
+ [{{ item.a }}{% if item.b %}<{{ loop(item.b) }}>{% endif %}]
+ {%- endfor %}"""
+ )
+ assert (
+ tmpl.render(
+ seq=[
+ dict(a=1, b=[dict(a=1), dict(a=2)]),
+ dict(a=2, b=[dict(a=1), dict(a=2)]),
+ dict(a=3, b=[dict(a="a")]),
+ ]
+ )
+ == "[1<[1][2]>][2<[1][2]>][3<[a]>]"
+ )
+
+ def test_recursive_lookaround(self, test_env_async):
+ tmpl = test_env_async.from_string(
+ """{% for item in seq recursive -%}
+ [{{ loop.previtem.a if loop.previtem is defined else 'x' }}.{{
+ item.a }}.{{ loop.nextitem.a if loop.nextitem is defined else 'x'
+ }}{% if item.b %}<{{ loop(item.b) }}>{% endif %}]
+ {%- endfor %}"""
+ )
+ assert (
+ tmpl.render(
+ seq=[
+ dict(a=1, b=[dict(a=1), dict(a=2)]),
+ dict(a=2, b=[dict(a=1), dict(a=2)]),
+ dict(a=3, b=[dict(a="a")]),
+ ]
+ )
+ == "[x.1.2<[x.1.2][1.2.x]>][1.2.3<[x.1.2][1.2.x]>][2.3.x<[x.a.x]>]"
+ )
+
+ def test_recursive_depth0(self, test_env_async):
+ tmpl = test_env_async.from_string(
+ "{% for item in seq recursive %}[{{ loop.depth0 }}:{{ item.a }}"
+ "{% if item.b %}<{{ loop(item.b) }}>{% endif %}]{% endfor %}"
+ )
+ assert (
+ tmpl.render(
+ seq=[
+ dict(a=1, b=[dict(a=1), dict(a=2)]),
+ dict(a=2, b=[dict(a=1), dict(a=2)]),
+ dict(a=3, b=[dict(a="a")]),
+ ]
+ )
+ == "[0:1<[1:1][1:2]>][0:2<[1:1][1:2]>][0:3<[1:a]>]"
+ )
+
+ def test_recursive_depth(self, test_env_async):
+ tmpl = test_env_async.from_string(
+ "{% for item in seq recursive %}[{{ loop.depth }}:{{ item.a }}"
+ "{% if item.b %}<{{ loop(item.b) }}>{% endif %}]{% endfor %}"
+ )
+ assert (
+ tmpl.render(
+ seq=[
+ dict(a=1, b=[dict(a=1), dict(a=2)]),
+ dict(a=2, b=[dict(a=1), dict(a=2)]),
+ dict(a=3, b=[dict(a="a")]),
+ ]
+ )
+ == "[1:1<[2:1][2:2]>][1:2<[2:1][2:2]>][1:3<[2:a]>]"
+ )
+
+ def test_looploop(self, test_env_async):
+ tmpl = test_env_async.from_string(
+ """{% for row in table %}
+ {%- set rowloop = loop -%}
+ {% for cell in row -%}
+ [{{ rowloop.index }}|{{ loop.index }}]
+ {%- endfor %}
+ {%- endfor %}"""
+ )
+ assert tmpl.render(table=["ab", "cd"]) == "[1|1][1|2][2|1][2|2]"
+
+ def test_reversed_bug(self, test_env_async):
+ tmpl = test_env_async.from_string(
+ "{% for i in items %}{{ i }}"
+ "{% if not loop.last %}"
+ ",{% endif %}{% endfor %}"
+ )
+ assert tmpl.render(items=reversed([3, 2, 1])) == "1,2,3"
+
+ def test_loop_errors(self, test_env_async):
+ tmpl = test_env_async.from_string(
+ """{% for item in [1] if loop.index
+ == 0 %}...{% endfor %}"""
+ )
+ pytest.raises(UndefinedError, tmpl.render)
+ tmpl = test_env_async.from_string(
+ """{% for item in [] %}...{% else
+ %}{{ loop }}{% endfor %}"""
+ )
+ assert tmpl.render() == ""
+
+ def test_loop_filter(self, test_env_async):
+ tmpl = test_env_async.from_string(
+ "{% for item in range(10) if item is even %}[{{ item }}]{% endfor %}"
+ )
+ assert tmpl.render() == "[0][2][4][6][8]"
+ tmpl = test_env_async.from_string(
+ """
+ {%- for item in range(10) if item is even %}[{{
+ loop.index }}:{{ item }}]{% endfor %}"""
+ )
+ assert tmpl.render() == "[1:0][2:2][3:4][4:6][5:8]"
+
+ def test_scoped_special_var(self, test_env_async):
+ t = test_env_async.from_string(
+ "{% for s in seq %}[{{ loop.first }}{% for c in s %}"
+ "|{{ loop.first }}{% endfor %}]{% endfor %}"
+ )
+ assert t.render(seq=("ab", "cd")) == "[True|True|False][False|True|False]"
+
+ def test_scoped_loop_var(self, test_env_async):
+ t = test_env_async.from_string(
+ "{% for x in seq %}{{ loop.first }}"
+ "{% for y in seq %}{% endfor %}{% endfor %}"
+ )
+ assert t.render(seq="ab") == "TrueFalse"
+ t = test_env_async.from_string(
+ "{% for x in seq %}{% for y in seq %}"
+ "{{ loop.first }}{% endfor %}{% endfor %}"
+ )
+ assert t.render(seq="ab") == "TrueFalseTrueFalse"
+
+ def test_recursive_empty_loop_iter(self, test_env_async):
+ t = test_env_async.from_string(
+ """
+ {%- for item in foo recursive -%}{%- endfor -%}
+ """
+ )
+ assert t.render(dict(foo=[])) == ""
+
+ def test_call_in_loop(self, test_env_async):
+ t = test_env_async.from_string(
+ """
+ {%- macro do_something() -%}
+ [{{ caller() }}]
+ {%- endmacro %}
+
+ {%- for i in [1, 2, 3] %}
+ {%- call do_something() -%}
+ {{ i }}
+ {%- endcall %}
+ {%- endfor -%}
+ """
+ )
+ assert t.render() == "[1][2][3]"
+
+ def test_scoping_bug(self, test_env_async):
+ t = test_env_async.from_string(
+ """
+ {%- for item in foo %}...{{ item }}...{% endfor %}
+ {%- macro item(a) %}...{{ a }}...{% endmacro %}
+ {{- item(2) -}}
+ """
+ )
+ assert t.render(foo=(1,)) == "...1......2..."
+
+ def test_unpacking(self, test_env_async):
+ tmpl = test_env_async.from_string(
+ "{% for a, b, c in [[1, 2, 3]] %}{{ a }}|{{ b }}|{{ c }}{% endfor %}"
+ )
+ assert tmpl.render() == "1|2|3"
+
+ def test_recursive_loop_filter(self, test_env_async):
+ t = test_env_async.from_string(
+ """
+ <?xml version="1.0" encoding="UTF-8"?>
+ <urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">
+ {%- for page in [site.root] if page.url != this recursive %}
+ <url><loc>{{ page.url }}</loc></url>
+ {{- loop(page.children) }}
+ {%- endfor %}
+ </urlset>
+ """
+ )
+ sm = t.render(
+ this="/foo",
+ site={"root": {"url": "/", "children": [{"url": "/foo"}, {"url": "/bar"}]}},
+ )
+ lines = [x.strip() for x in sm.splitlines() if x.strip()]
+ assert lines == [
+ '<?xml version="1.0" encoding="UTF-8"?>',
+ '<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">',
+ "<url><loc>/</loc></url>",
+ "<url><loc>/bar</loc></url>",
+ "</urlset>",
+ ]
+
+ def test_nonrecursive_loop_filter(self, test_env_async):
+ t = test_env_async.from_string(
+ """
+ <?xml version="1.0" encoding="UTF-8"?>
+ <urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">
+ {%- for page in items if page.url != this %}
+ <url><loc>{{ page.url }}</loc></url>
+ {%- endfor %}
+ </urlset>
+ """
+ )
+ sm = t.render(
+ this="/foo", items=[{"url": "/"}, {"url": "/foo"}, {"url": "/bar"}]
+ )
+ lines = [x.strip() for x in sm.splitlines() if x.strip()]
+ assert lines == [
+ '<?xml version="1.0" encoding="UTF-8"?>',
+ '<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">',
+ "<url><loc>/</loc></url>",
+ "<url><loc>/bar</loc></url>",
+ "</urlset>",
+ ]
+
+ def test_bare_async(self, test_env_async):
+ t = test_env_async.from_string('{% extends "header" %}')
+ assert t.render(foo=42) == "[42|23]"
+
+ def test_awaitable_property_slicing(self, test_env_async):
+ t = test_env_async.from_string("{% for x in a.b[:1] %}{{ x }}{% endfor %}")
+ assert t.render(a=dict(b=[1, 2, 3])) == "1"
+
+
+def test_namespace_awaitable(test_env_async):
+ async def _test():
+ t = test_env_async.from_string(
+ '{% set ns = namespace(foo="Bar") %}{{ ns.foo }}'
+ )
+ actual = await t.render_async()
+ assert actual == "Bar"
+
+ asyncio.run(_test())
+
+
+def test_chainable_undefined_aiter():
+ async def _test():
+ t = Template(
+ "{% for x in a['b']['c'] %}{{ x }}{% endfor %}",
+ enable_async=True,
+ undefined=ChainableUndefined,
+ )
+ rv = await t.render_async(a={})
+ assert rv == ""
+
+ asyncio.run(_test())
+
+
+@pytest.fixture
+def async_native_env():
+ return NativeEnvironment(enable_async=True)
+
+
+def test_native_async(async_native_env):
+ async def _test():
+ t = async_native_env.from_string("{{ x }}")
+ rv = await t.render_async(x=23)
+ assert rv == 23
+
+ asyncio.run(_test())
+
+
+def test_native_list_async(async_native_env):
+ async def _test():
+ t = async_native_env.from_string("{{ x }}")
+ rv = await t.render_async(x=list(range(3)))
+ assert rv == [0, 1, 2]
+
+ asyncio.run(_test())
+
+
+def test_getitem_after_filter():
+ env = Environment(enable_async=True)
+ env.filters["add_each"] = lambda v, x: [i + x for i in v]
+ t = env.from_string("{{ (a|add_each(2))[1:] }}")
+ out = t.render(a=range(3))
+ assert out == "[3, 4]"
+
+
+def test_getitem_after_call():
+ env = Environment(enable_async=True)
+ env.globals["add_each"] = lambda v, x: [i + x for i in v]
+ t = env.from_string("{{ add_each(a, 2)[1:] }}")
+ out = t.render(a=range(3))
+ assert out == "[3, 4]"