diff options
Diffstat (limited to 'third_party/python/Mako/mako/codegen.py')
-rw-r--r-- | third_party/python/Mako/mako/codegen.py | 1318 |
1 files changed, 1318 insertions, 0 deletions
diff --git a/third_party/python/Mako/mako/codegen.py b/third_party/python/Mako/mako/codegen.py new file mode 100644 index 0000000000..a9ae55b847 --- /dev/null +++ b/third_party/python/Mako/mako/codegen.py @@ -0,0 +1,1318 @@ +# mako/codegen.py +# Copyright 2006-2020 the Mako authors and contributors <see AUTHORS file> +# +# This module is part of Mako and is released under +# the MIT License: http://www.opensource.org/licenses/mit-license.php + +"""provides functionality for rendering a parsetree constructing into module +source code.""" + +import json +import re +import time + +from mako import ast +from mako import compat +from mako import exceptions +from mako import filters +from mako import parsetree +from mako import util +from mako.pygen import PythonPrinter + + +MAGIC_NUMBER = 10 + +# names which are hardwired into the +# template and are not accessed via the +# context itself +TOPLEVEL_DECLARED = set(["UNDEFINED", "STOP_RENDERING"]) +RESERVED_NAMES = set(["context", "loop"]).union(TOPLEVEL_DECLARED) + + +def compile( # noqa + node, + uri, + filename=None, + default_filters=None, + buffer_filters=None, + imports=None, + future_imports=None, + source_encoding=None, + generate_magic_comment=True, + disable_unicode=False, + strict_undefined=False, + enable_loop=True, + reserved_names=frozenset(), +): + """Generate module source code given a parsetree node, + uri, and optional source filename""" + + # if on Py2K, push the "source_encoding" string to be + # a bytestring itself, as we will be embedding it into + # the generated source and we don't want to coerce the + # result into a unicode object, in "disable_unicode" mode + if not compat.py3k and isinstance(source_encoding, compat.text_type): + source_encoding = source_encoding.encode(source_encoding) + + buf = util.FastEncodingBuffer() + + printer = PythonPrinter(buf) + _GenerateRenderMethod( + printer, + _CompileContext( + uri, + filename, + default_filters, + buffer_filters, + imports, + future_imports, + source_encoding, + generate_magic_comment, + disable_unicode, + strict_undefined, + enable_loop, + reserved_names, + ), + node, + ) + return buf.getvalue() + + +class _CompileContext(object): + def __init__( + self, + uri, + filename, + default_filters, + buffer_filters, + imports, + future_imports, + source_encoding, + generate_magic_comment, + disable_unicode, + strict_undefined, + enable_loop, + reserved_names, + ): + self.uri = uri + self.filename = filename + self.default_filters = default_filters + self.buffer_filters = buffer_filters + self.imports = imports + self.future_imports = future_imports + self.source_encoding = source_encoding + self.generate_magic_comment = generate_magic_comment + self.disable_unicode = disable_unicode + self.strict_undefined = strict_undefined + self.enable_loop = enable_loop + self.reserved_names = reserved_names + + +class _GenerateRenderMethod(object): + + """A template visitor object which generates the + full module source for a template. + + """ + + def __init__(self, printer, compiler, node): + self.printer = printer + self.compiler = compiler + self.node = node + self.identifier_stack = [None] + self.in_def = isinstance(node, (parsetree.DefTag, parsetree.BlockTag)) + + if self.in_def: + name = "render_%s" % node.funcname + args = node.get_argument_expressions() + filtered = len(node.filter_args.args) > 0 + buffered = eval(node.attributes.get("buffered", "False")) + cached = eval(node.attributes.get("cached", "False")) + defs = None + pagetag = None + if node.is_block and not node.is_anonymous: + args += ["**pageargs"] + else: + defs = self.write_toplevel() + pagetag = self.compiler.pagetag + name = "render_body" + if pagetag is not None: + args = pagetag.body_decl.get_argument_expressions() + if not pagetag.body_decl.kwargs: + args += ["**pageargs"] + cached = eval(pagetag.attributes.get("cached", "False")) + self.compiler.enable_loop = self.compiler.enable_loop or eval( + pagetag.attributes.get("enable_loop", "False") + ) + else: + args = ["**pageargs"] + cached = False + buffered = filtered = False + if args is None: + args = ["context"] + else: + args = [a for a in ["context"] + args] + + self.write_render_callable( + pagetag or node, name, args, buffered, filtered, cached + ) + + if defs is not None: + for node in defs: + _GenerateRenderMethod(printer, compiler, node) + + if not self.in_def: + self.write_metadata_struct() + + def write_metadata_struct(self): + self.printer.source_map[self.printer.lineno] = max( + self.printer.source_map + ) + struct = { + "filename": self.compiler.filename, + "uri": self.compiler.uri, + "source_encoding": self.compiler.source_encoding, + "line_map": self.printer.source_map, + } + self.printer.writelines( + '"""', + "__M_BEGIN_METADATA", + json.dumps(struct), + "__M_END_METADATA\n" '"""', + ) + + @property + def identifiers(self): + return self.identifier_stack[-1] + + def write_toplevel(self): + """Traverse a template structure for module-level directives and + generate the start of module-level code. + + """ + inherit = [] + namespaces = {} + module_code = [] + + self.compiler.pagetag = None + + class FindTopLevel(object): + def visitInheritTag(s, node): + inherit.append(node) + + def visitNamespaceTag(s, node): + namespaces[node.name] = node + + def visitPageTag(s, node): + self.compiler.pagetag = node + + def visitCode(s, node): + if node.ismodule: + module_code.append(node) + + f = FindTopLevel() + for n in self.node.nodes: + n.accept_visitor(f) + + self.compiler.namespaces = namespaces + + module_ident = set() + for n in module_code: + module_ident = module_ident.union(n.declared_identifiers()) + + module_identifiers = _Identifiers(self.compiler) + module_identifiers.declared = module_ident + + # module-level names, python code + if ( + self.compiler.generate_magic_comment + and self.compiler.source_encoding + ): + self.printer.writeline( + "# -*- coding:%s -*-" % self.compiler.source_encoding + ) + + if self.compiler.future_imports: + self.printer.writeline( + "from __future__ import %s" + % (", ".join(self.compiler.future_imports),) + ) + self.printer.writeline("from mako import runtime, filters, cache") + self.printer.writeline("UNDEFINED = runtime.UNDEFINED") + self.printer.writeline("STOP_RENDERING = runtime.STOP_RENDERING") + self.printer.writeline("__M_dict_builtin = dict") + self.printer.writeline("__M_locals_builtin = locals") + self.printer.writeline("_magic_number = %r" % MAGIC_NUMBER) + self.printer.writeline("_modified_time = %r" % time.time()) + self.printer.writeline("_enable_loop = %r" % self.compiler.enable_loop) + self.printer.writeline( + "_template_filename = %r" % self.compiler.filename + ) + self.printer.writeline("_template_uri = %r" % self.compiler.uri) + self.printer.writeline( + "_source_encoding = %r" % self.compiler.source_encoding + ) + if self.compiler.imports: + buf = "" + for imp in self.compiler.imports: + buf += imp + "\n" + self.printer.writeline(imp) + impcode = ast.PythonCode( + buf, + source="", + lineno=0, + pos=0, + filename="template defined imports", + ) + else: + impcode = None + + main_identifiers = module_identifiers.branch(self.node) + mit = module_identifiers.topleveldefs + module_identifiers.topleveldefs = mit.union( + main_identifiers.topleveldefs + ) + module_identifiers.declared.update(TOPLEVEL_DECLARED) + if impcode: + module_identifiers.declared.update(impcode.declared_identifiers) + + self.compiler.identifiers = module_identifiers + self.printer.writeline( + "_exports = %r" + % [n.name for n in main_identifiers.topleveldefs.values()] + ) + self.printer.write_blanks(2) + + if len(module_code): + self.write_module_code(module_code) + + if len(inherit): + self.write_namespaces(namespaces) + self.write_inherit(inherit[-1]) + elif len(namespaces): + self.write_namespaces(namespaces) + + return list(main_identifiers.topleveldefs.values()) + + def write_render_callable( + self, node, name, args, buffered, filtered, cached + ): + """write a top-level render callable. + + this could be the main render() method or that of a top-level def.""" + + if self.in_def: + decorator = node.decorator + if decorator: + self.printer.writeline( + "@runtime._decorate_toplevel(%s)" % decorator + ) + + self.printer.start_source(node.lineno) + self.printer.writelines( + "def %s(%s):" % (name, ",".join(args)), + # push new frame, assign current frame to __M_caller + "__M_caller = context.caller_stack._push_frame()", + "try:", + ) + if buffered or filtered or cached: + self.printer.writeline("context._push_buffer()") + + self.identifier_stack.append( + self.compiler.identifiers.branch(self.node) + ) + if (not self.in_def or self.node.is_block) and "**pageargs" in args: + self.identifier_stack[-1].argument_declared.add("pageargs") + + if not self.in_def and ( + len(self.identifiers.locally_assigned) > 0 + or len(self.identifiers.argument_declared) > 0 + ): + self.printer.writeline( + "__M_locals = __M_dict_builtin(%s)" + % ",".join( + [ + "%s=%s" % (x, x) + for x in self.identifiers.argument_declared + ] + ) + ) + + self.write_variable_declares(self.identifiers, toplevel=True) + + for n in self.node.nodes: + n.accept_visitor(self) + + self.write_def_finish(self.node, buffered, filtered, cached) + self.printer.writeline(None) + self.printer.write_blanks(2) + if cached: + self.write_cache_decorator( + node, name, args, buffered, self.identifiers, toplevel=True + ) + + def write_module_code(self, module_code): + """write module-level template code, i.e. that which + is enclosed in <%! %> tags in the template.""" + for n in module_code: + self.printer.write_indented_block(n.text, starting_lineno=n.lineno) + + def write_inherit(self, node): + """write the module-level inheritance-determination callable.""" + + self.printer.writelines( + "def _mako_inherit(template, context):", + "_mako_generate_namespaces(context)", + "return runtime._inherit_from(context, %s, _template_uri)" + % (node.parsed_attributes["file"]), + None, + ) + + def write_namespaces(self, namespaces): + """write the module-level namespace-generating callable.""" + self.printer.writelines( + "def _mako_get_namespace(context, name):", + "try:", + "return context.namespaces[(__name__, name)]", + "except KeyError:", + "_mako_generate_namespaces(context)", + "return context.namespaces[(__name__, name)]", + None, + None, + ) + self.printer.writeline("def _mako_generate_namespaces(context):") + + for node in namespaces.values(): + if "import" in node.attributes: + self.compiler.has_ns_imports = True + self.printer.start_source(node.lineno) + if len(node.nodes): + self.printer.writeline("def make_namespace():") + export = [] + identifiers = self.compiler.identifiers.branch(node) + self.in_def = True + + class NSDefVisitor(object): + def visitDefTag(s, node): + s.visitDefOrBase(node) + + def visitBlockTag(s, node): + s.visitDefOrBase(node) + + def visitDefOrBase(s, node): + if node.is_anonymous: + raise exceptions.CompileException( + "Can't put anonymous blocks inside " + "<%namespace>", + **node.exception_kwargs + ) + self.write_inline_def(node, identifiers, nested=False) + export.append(node.funcname) + + vis = NSDefVisitor() + for n in node.nodes: + n.accept_visitor(vis) + self.printer.writeline("return [%s]" % (",".join(export))) + self.printer.writeline(None) + self.in_def = False + callable_name = "make_namespace()" + else: + callable_name = "None" + + if "file" in node.parsed_attributes: + self.printer.writeline( + "ns = runtime.TemplateNamespace(%r," + " context._clean_inheritance_tokens()," + " templateuri=%s, callables=%s, " + " calling_uri=_template_uri)" + % ( + node.name, + node.parsed_attributes.get("file", "None"), + callable_name, + ) + ) + elif "module" in node.parsed_attributes: + self.printer.writeline( + "ns = runtime.ModuleNamespace(%r," + " context._clean_inheritance_tokens()," + " callables=%s, calling_uri=_template_uri," + " module=%s)" + % ( + node.name, + callable_name, + node.parsed_attributes.get("module", "None"), + ) + ) + else: + self.printer.writeline( + "ns = runtime.Namespace(%r," + " context._clean_inheritance_tokens()," + " callables=%s, calling_uri=_template_uri)" + % (node.name, callable_name) + ) + if eval(node.attributes.get("inheritable", "False")): + self.printer.writeline("context['self'].%s = ns" % (node.name)) + + self.printer.writeline( + "context.namespaces[(__name__, %s)] = ns" % repr(node.name) + ) + self.printer.write_blanks(1) + if not len(namespaces): + self.printer.writeline("pass") + self.printer.writeline(None) + + def write_variable_declares(self, identifiers, toplevel=False, limit=None): + """write variable declarations at the top of a function. + + the variable declarations are in the form of callable + definitions for defs and/or name lookup within the + function's context argument. the names declared are based + on the names that are referenced in the function body, + which don't otherwise have any explicit assignment + operation. names that are assigned within the body are + assumed to be locally-scoped variables and are not + separately declared. + + for def callable definitions, if the def is a top-level + callable then a 'stub' callable is generated which wraps + the current Context into a closure. if the def is not + top-level, it is fully rendered as a local closure. + + """ + + # collection of all defs available to us in this scope + comp_idents = dict([(c.funcname, c) for c in identifiers.defs]) + to_write = set() + + # write "context.get()" for all variables we are going to + # need that arent in the namespace yet + to_write = to_write.union(identifiers.undeclared) + + # write closure functions for closures that we define + # right here + to_write = to_write.union( + [c.funcname for c in identifiers.closuredefs.values()] + ) + + # remove identifiers that are declared in the argument + # signature of the callable + to_write = to_write.difference(identifiers.argument_declared) + + # remove identifiers that we are going to assign to. + # in this way we mimic Python's behavior, + # i.e. assignment to a variable within a block + # means that variable is now a "locally declared" var, + # which cannot be referenced beforehand. + to_write = to_write.difference(identifiers.locally_declared) + + if self.compiler.enable_loop: + has_loop = "loop" in to_write + to_write.discard("loop") + else: + has_loop = False + + # if a limiting set was sent, constraint to those items in that list + # (this is used for the caching decorator) + if limit is not None: + to_write = to_write.intersection(limit) + + if toplevel and getattr(self.compiler, "has_ns_imports", False): + self.printer.writeline("_import_ns = {}") + self.compiler.has_imports = True + for ident, ns in self.compiler.namespaces.items(): + if "import" in ns.attributes: + self.printer.writeline( + "_mako_get_namespace(context, %r)." + "_populate(_import_ns, %r)" + % ( + ident, + re.split(r"\s*,\s*", ns.attributes["import"]), + ) + ) + + if has_loop: + self.printer.writeline("loop = __M_loop = runtime.LoopStack()") + + for ident in to_write: + if ident in comp_idents: + comp = comp_idents[ident] + if comp.is_block: + if not comp.is_anonymous: + self.write_def_decl(comp, identifiers) + else: + self.write_inline_def(comp, identifiers, nested=True) + else: + if comp.is_root(): + self.write_def_decl(comp, identifiers) + else: + self.write_inline_def(comp, identifiers, nested=True) + + elif ident in self.compiler.namespaces: + self.printer.writeline( + "%s = _mako_get_namespace(context, %r)" % (ident, ident) + ) + else: + if getattr(self.compiler, "has_ns_imports", False): + if self.compiler.strict_undefined: + self.printer.writelines( + "%s = _import_ns.get(%r, UNDEFINED)" + % (ident, ident), + "if %s is UNDEFINED:" % ident, + "try:", + "%s = context[%r]" % (ident, ident), + "except KeyError:", + "raise NameError(\"'%s' is not defined\")" % ident, + None, + None, + ) + else: + self.printer.writeline( + "%s = _import_ns.get" + "(%r, context.get(%r, UNDEFINED))" + % (ident, ident, ident) + ) + else: + if self.compiler.strict_undefined: + self.printer.writelines( + "try:", + "%s = context[%r]" % (ident, ident), + "except KeyError:", + "raise NameError(\"'%s' is not defined\")" % ident, + None, + ) + else: + self.printer.writeline( + "%s = context.get(%r, UNDEFINED)" % (ident, ident) + ) + + self.printer.writeline("__M_writer = context.writer()") + + def write_def_decl(self, node, identifiers): + """write a locally-available callable referencing a top-level def""" + funcname = node.funcname + namedecls = node.get_argument_expressions() + nameargs = node.get_argument_expressions(as_call=True) + + if not self.in_def and ( + len(self.identifiers.locally_assigned) > 0 + or len(self.identifiers.argument_declared) > 0 + ): + nameargs.insert(0, "context._locals(__M_locals)") + else: + nameargs.insert(0, "context") + self.printer.writeline("def %s(%s):" % (funcname, ",".join(namedecls))) + self.printer.writeline( + "return render_%s(%s)" % (funcname, ",".join(nameargs)) + ) + self.printer.writeline(None) + + def write_inline_def(self, node, identifiers, nested): + """write a locally-available def callable inside an enclosing def.""" + + namedecls = node.get_argument_expressions() + + decorator = node.decorator + if decorator: + self.printer.writeline( + "@runtime._decorate_inline(context, %s)" % decorator + ) + self.printer.writeline( + "def %s(%s):" % (node.funcname, ",".join(namedecls)) + ) + filtered = len(node.filter_args.args) > 0 + buffered = eval(node.attributes.get("buffered", "False")) + cached = eval(node.attributes.get("cached", "False")) + self.printer.writelines( + # push new frame, assign current frame to __M_caller + "__M_caller = context.caller_stack._push_frame()", + "try:", + ) + if buffered or filtered or cached: + self.printer.writelines("context._push_buffer()") + + identifiers = identifiers.branch(node, nested=nested) + + self.write_variable_declares(identifiers) + + self.identifier_stack.append(identifiers) + for n in node.nodes: + n.accept_visitor(self) + self.identifier_stack.pop() + + self.write_def_finish(node, buffered, filtered, cached) + self.printer.writeline(None) + if cached: + self.write_cache_decorator( + node, + node.funcname, + namedecls, + False, + identifiers, + inline=True, + toplevel=False, + ) + + def write_def_finish( + self, node, buffered, filtered, cached, callstack=True + ): + """write the end section of a rendering function, either outermost or + inline. + + this takes into account if the rendering function was filtered, + buffered, etc. and closes the corresponding try: block if any, and + writes code to retrieve captured content, apply filters, send proper + return value.""" + + if not buffered and not cached and not filtered: + self.printer.writeline("return ''") + if callstack: + self.printer.writelines( + "finally:", "context.caller_stack._pop_frame()", None + ) + + if buffered or filtered or cached: + if buffered or cached: + # in a caching scenario, don't try to get a writer + # from the context after popping; assume the caching + # implemenation might be using a context with no + # extra buffers + self.printer.writelines( + "finally:", "__M_buf = context._pop_buffer()" + ) + else: + self.printer.writelines( + "finally:", + "__M_buf, __M_writer = context._pop_buffer_and_writer()", + ) + + if callstack: + self.printer.writeline("context.caller_stack._pop_frame()") + + s = "__M_buf.getvalue()" + if filtered: + s = self.create_filter_callable( + node.filter_args.args, s, False + ) + self.printer.writeline(None) + if buffered and not cached: + s = self.create_filter_callable( + self.compiler.buffer_filters, s, False + ) + if buffered or cached: + self.printer.writeline("return %s" % s) + else: + self.printer.writelines("__M_writer(%s)" % s, "return ''") + + def write_cache_decorator( + self, + node_or_pagetag, + name, + args, + buffered, + identifiers, + inline=False, + toplevel=False, + ): + """write a post-function decorator to replace a rendering + callable with a cached version of itself.""" + + self.printer.writeline("__M_%s = %s" % (name, name)) + cachekey = node_or_pagetag.parsed_attributes.get( + "cache_key", repr(name) + ) + + cache_args = {} + if self.compiler.pagetag is not None: + cache_args.update( + (pa[6:], self.compiler.pagetag.parsed_attributes[pa]) + for pa in self.compiler.pagetag.parsed_attributes + if pa.startswith("cache_") and pa != "cache_key" + ) + cache_args.update( + (pa[6:], node_or_pagetag.parsed_attributes[pa]) + for pa in node_or_pagetag.parsed_attributes + if pa.startswith("cache_") and pa != "cache_key" + ) + if "timeout" in cache_args: + cache_args["timeout"] = int(eval(cache_args["timeout"])) + + self.printer.writeline("def %s(%s):" % (name, ",".join(args))) + + # form "arg1, arg2, arg3=arg3, arg4=arg4", etc. + pass_args = [ + "%s=%s" % ((a.split("=")[0],) * 2) if "=" in a else a for a in args + ] + + self.write_variable_declares( + identifiers, + toplevel=toplevel, + limit=node_or_pagetag.undeclared_identifiers(), + ) + if buffered: + s = ( + "context.get('local')." + "cache._ctx_get_or_create(" + "%s, lambda:__M_%s(%s), context, %s__M_defname=%r)" + % ( + cachekey, + name, + ",".join(pass_args), + "".join( + ["%s=%s, " % (k, v) for k, v in cache_args.items()] + ), + name, + ) + ) + # apply buffer_filters + s = self.create_filter_callable( + self.compiler.buffer_filters, s, False + ) + self.printer.writelines("return " + s, None) + else: + self.printer.writelines( + "__M_writer(context.get('local')." + "cache._ctx_get_or_create(" + "%s, lambda:__M_%s(%s), context, %s__M_defname=%r))" + % ( + cachekey, + name, + ",".join(pass_args), + "".join( + ["%s=%s, " % (k, v) for k, v in cache_args.items()] + ), + name, + ), + "return ''", + None, + ) + + def create_filter_callable(self, args, target, is_expression): + """write a filter-applying expression based on the filters + present in the given filter names, adjusting for the global + 'default' filter aliases as needed.""" + + def locate_encode(name): + if re.match(r"decode\..+", name): + return "filters." + name + elif self.compiler.disable_unicode: + return filters.NON_UNICODE_ESCAPES.get(name, name) + else: + return filters.DEFAULT_ESCAPES.get(name, name) + + if "n" not in args: + if is_expression: + if self.compiler.pagetag: + args = self.compiler.pagetag.filter_args.args + args + if self.compiler.default_filters and "n" not in args: + args = self.compiler.default_filters + args + for e in args: + # if filter given as a function, get just the identifier portion + if e == "n": + continue + m = re.match(r"(.+?)(\(.*\))", e) + if m: + ident, fargs = m.group(1, 2) + f = locate_encode(ident) + e = f + fargs + else: + e = locate_encode(e) + assert e is not None + target = "%s(%s)" % (e, target) + return target + + def visitExpression(self, node): + self.printer.start_source(node.lineno) + if ( + len(node.escapes) + or ( + self.compiler.pagetag is not None + and len(self.compiler.pagetag.filter_args.args) + ) + or len(self.compiler.default_filters) + ): + + s = self.create_filter_callable( + node.escapes_code.args, "%s" % node.text, True + ) + self.printer.writeline("__M_writer(%s)" % s) + else: + self.printer.writeline("__M_writer(%s)" % node.text) + + def visitControlLine(self, node): + if node.isend: + self.printer.writeline(None) + if node.has_loop_context: + self.printer.writeline("finally:") + self.printer.writeline("loop = __M_loop._exit()") + self.printer.writeline(None) + else: + self.printer.start_source(node.lineno) + if self.compiler.enable_loop and node.keyword == "for": + text = mangle_mako_loop(node, self.printer) + else: + text = node.text + self.printer.writeline(text) + children = node.get_children() + # this covers the three situations where we want to insert a pass: + # 1) a ternary control line with no children, + # 2) a primary control line with nothing but its own ternary + # and end control lines, and + # 3) any control line with no content other than comments + if not children or ( + compat.all( + isinstance(c, (parsetree.Comment, parsetree.ControlLine)) + for c in children + ) + and compat.all( + (node.is_ternary(c.keyword) or c.isend) + for c in children + if isinstance(c, parsetree.ControlLine) + ) + ): + self.printer.writeline("pass") + + def visitText(self, node): + self.printer.start_source(node.lineno) + self.printer.writeline("__M_writer(%s)" % repr(node.content)) + + def visitTextTag(self, node): + filtered = len(node.filter_args.args) > 0 + if filtered: + self.printer.writelines( + "__M_writer = context._push_writer()", "try:" + ) + for n in node.nodes: + n.accept_visitor(self) + if filtered: + self.printer.writelines( + "finally:", + "__M_buf, __M_writer = context._pop_buffer_and_writer()", + "__M_writer(%s)" + % self.create_filter_callable( + node.filter_args.args, "__M_buf.getvalue()", False + ), + None, + ) + + def visitCode(self, node): + if not node.ismodule: + self.printer.write_indented_block( + node.text, starting_lineno=node.lineno + ) + + if not self.in_def and len(self.identifiers.locally_assigned) > 0: + # if we are the "template" def, fudge locally + # declared/modified variables into the "__M_locals" dictionary, + # which is used for def calls within the same template, + # to simulate "enclosing scope" + self.printer.writeline( + "__M_locals_builtin_stored = __M_locals_builtin()" + ) + self.printer.writeline( + "__M_locals.update(__M_dict_builtin([(__M_key," + " __M_locals_builtin_stored[__M_key]) for __M_key in" + " [%s] if __M_key in __M_locals_builtin_stored]))" + % ",".join([repr(x) for x in node.declared_identifiers()]) + ) + + def visitIncludeTag(self, node): + self.printer.start_source(node.lineno) + args = node.attributes.get("args") + if args: + self.printer.writeline( + "runtime._include_file(context, %s, _template_uri, %s)" + % (node.parsed_attributes["file"], args) + ) + else: + self.printer.writeline( + "runtime._include_file(context, %s, _template_uri)" + % (node.parsed_attributes["file"]) + ) + + def visitNamespaceTag(self, node): + pass + + def visitDefTag(self, node): + pass + + def visitBlockTag(self, node): + if node.is_anonymous: + self.printer.writeline("%s()" % node.funcname) + else: + nameargs = node.get_argument_expressions(as_call=True) + nameargs += ["**pageargs"] + self.printer.writeline( + "if 'parent' not in context._data or " + "not hasattr(context._data['parent'], '%s'):" % node.funcname + ) + self.printer.writeline( + "context['self'].%s(%s)" % (node.funcname, ",".join(nameargs)) + ) + self.printer.writeline("\n") + + def visitCallNamespaceTag(self, node): + # TODO: we can put namespace-specific checks here, such + # as ensure the given namespace will be imported, + # pre-import the namespace, etc. + self.visitCallTag(node) + + def visitCallTag(self, node): + self.printer.writeline("def ccall(caller):") + export = ["body"] + callable_identifiers = self.identifiers.branch(node, nested=True) + body_identifiers = callable_identifiers.branch(node, nested=False) + # we want the 'caller' passed to ccall to be used + # for the body() function, but for other non-body() + # <%def>s within <%call> we want the current caller + # off the call stack (if any) + body_identifiers.add_declared("caller") + + self.identifier_stack.append(body_identifiers) + + class DefVisitor(object): + def visitDefTag(s, node): + s.visitDefOrBase(node) + + def visitBlockTag(s, node): + s.visitDefOrBase(node) + + def visitDefOrBase(s, node): + self.write_inline_def(node, callable_identifiers, nested=False) + if not node.is_anonymous: + export.append(node.funcname) + # remove defs that are within the <%call> from the + # "closuredefs" defined in the body, so they dont render twice + if node.funcname in body_identifiers.closuredefs: + del body_identifiers.closuredefs[node.funcname] + + vis = DefVisitor() + for n in node.nodes: + n.accept_visitor(vis) + self.identifier_stack.pop() + + bodyargs = node.body_decl.get_argument_expressions() + self.printer.writeline("def body(%s):" % ",".join(bodyargs)) + + # TODO: figure out best way to specify + # buffering/nonbuffering (at call time would be better) + buffered = False + if buffered: + self.printer.writelines("context._push_buffer()", "try:") + self.write_variable_declares(body_identifiers) + self.identifier_stack.append(body_identifiers) + + for n in node.nodes: + n.accept_visitor(self) + self.identifier_stack.pop() + + self.write_def_finish(node, buffered, False, False, callstack=False) + self.printer.writelines(None, "return [%s]" % (",".join(export)), None) + + self.printer.writelines( + # push on caller for nested call + "context.caller_stack.nextcaller = " + "runtime.Namespace('caller', context, " + "callables=ccall(__M_caller))", + "try:", + ) + self.printer.start_source(node.lineno) + self.printer.writelines( + "__M_writer(%s)" + % self.create_filter_callable([], node.expression, True), + "finally:", + "context.caller_stack.nextcaller = None", + None, + ) + + +class _Identifiers(object): + + """tracks the status of identifier names as template code is rendered.""" + + def __init__(self, compiler, node=None, parent=None, nested=False): + if parent is not None: + # if we are the branch created in write_namespaces(), + # we don't share any context from the main body(). + if isinstance(node, parsetree.NamespaceTag): + self.declared = set() + self.topleveldefs = util.SetLikeDict() + else: + # things that have already been declared + # in an enclosing namespace (i.e. names we can just use) + self.declared = ( + set(parent.declared) + .union([c.name for c in parent.closuredefs.values()]) + .union(parent.locally_declared) + .union(parent.argument_declared) + ) + + # if these identifiers correspond to a "nested" + # scope, it means whatever the parent identifiers + # had as undeclared will have been declared by that parent, + # and therefore we have them in our scope. + if nested: + self.declared = self.declared.union(parent.undeclared) + + # top level defs that are available + self.topleveldefs = util.SetLikeDict(**parent.topleveldefs) + else: + self.declared = set() + self.topleveldefs = util.SetLikeDict() + + self.compiler = compiler + + # things within this level that are referenced before they + # are declared (e.g. assigned to) + self.undeclared = set() + + # things that are declared locally. some of these things + # could be in the "undeclared" list as well if they are + # referenced before declared + self.locally_declared = set() + + # assignments made in explicit python blocks. + # these will be propagated to + # the context of local def calls. + self.locally_assigned = set() + + # things that are declared in the argument + # signature of the def callable + self.argument_declared = set() + + # closure defs that are defined in this level + self.closuredefs = util.SetLikeDict() + + self.node = node + + if node is not None: + node.accept_visitor(self) + + illegal_names = self.compiler.reserved_names.intersection( + self.locally_declared + ) + if illegal_names: + raise exceptions.NameConflictError( + "Reserved words declared in template: %s" + % ", ".join(illegal_names) + ) + + def branch(self, node, **kwargs): + """create a new Identifiers for a new Node, with + this Identifiers as the parent.""" + + return _Identifiers(self.compiler, node, self, **kwargs) + + @property + def defs(self): + return set(self.topleveldefs.union(self.closuredefs).values()) + + def __repr__(self): + return ( + "Identifiers(declared=%r, locally_declared=%r, " + "undeclared=%r, topleveldefs=%r, closuredefs=%r, " + "argumentdeclared=%r)" + % ( + list(self.declared), + list(self.locally_declared), + list(self.undeclared), + [c.name for c in self.topleveldefs.values()], + [c.name for c in self.closuredefs.values()], + self.argument_declared, + ) + ) + + def check_declared(self, node): + """update the state of this Identifiers with the undeclared + and declared identifiers of the given node.""" + + for ident in node.undeclared_identifiers(): + if ident != "context" and ident not in self.declared.union( + self.locally_declared + ): + self.undeclared.add(ident) + for ident in node.declared_identifiers(): + self.locally_declared.add(ident) + + def add_declared(self, ident): + self.declared.add(ident) + if ident in self.undeclared: + self.undeclared.remove(ident) + + def visitExpression(self, node): + self.check_declared(node) + + def visitControlLine(self, node): + self.check_declared(node) + + def visitCode(self, node): + if not node.ismodule: + self.check_declared(node) + self.locally_assigned = self.locally_assigned.union( + node.declared_identifiers() + ) + + def visitNamespaceTag(self, node): + # only traverse into the sub-elements of a + # <%namespace> tag if we are the branch created in + # write_namespaces() + if self.node is node: + for n in node.nodes: + n.accept_visitor(self) + + def _check_name_exists(self, collection, node): + existing = collection.get(node.funcname) + collection[node.funcname] = node + if ( + existing is not None + and existing is not node + and (node.is_block or existing.is_block) + ): + raise exceptions.CompileException( + "%%def or %%block named '%s' already " + "exists in this template." % node.funcname, + **node.exception_kwargs + ) + + def visitDefTag(self, node): + if node.is_root() and not node.is_anonymous: + self._check_name_exists(self.topleveldefs, node) + elif node is not self.node: + self._check_name_exists(self.closuredefs, node) + + for ident in node.undeclared_identifiers(): + if ident != "context" and ident not in self.declared.union( + self.locally_declared + ): + self.undeclared.add(ident) + + # visit defs only one level deep + if node is self.node: + for ident in node.declared_identifiers(): + self.argument_declared.add(ident) + + for n in node.nodes: + n.accept_visitor(self) + + def visitBlockTag(self, node): + if node is not self.node and not node.is_anonymous: + + if isinstance(self.node, parsetree.DefTag): + raise exceptions.CompileException( + "Named block '%s' not allowed inside of def '%s'" + % (node.name, self.node.name), + **node.exception_kwargs + ) + elif isinstance( + self.node, (parsetree.CallTag, parsetree.CallNamespaceTag) + ): + raise exceptions.CompileException( + "Named block '%s' not allowed inside of <%%call> tag" + % (node.name,), + **node.exception_kwargs + ) + + for ident in node.undeclared_identifiers(): + if ident != "context" and ident not in self.declared.union( + self.locally_declared + ): + self.undeclared.add(ident) + + if not node.is_anonymous: + self._check_name_exists(self.topleveldefs, node) + self.undeclared.add(node.funcname) + elif node is not self.node: + self._check_name_exists(self.closuredefs, node) + for ident in node.declared_identifiers(): + self.argument_declared.add(ident) + for n in node.nodes: + n.accept_visitor(self) + + def visitTextTag(self, node): + for ident in node.undeclared_identifiers(): + if ident != "context" and ident not in self.declared.union( + self.locally_declared + ): + self.undeclared.add(ident) + + def visitIncludeTag(self, node): + self.check_declared(node) + + def visitPageTag(self, node): + for ident in node.declared_identifiers(): + self.argument_declared.add(ident) + self.check_declared(node) + + def visitCallNamespaceTag(self, node): + self.visitCallTag(node) + + def visitCallTag(self, node): + if node is self.node: + for ident in node.undeclared_identifiers(): + if ident != "context" and ident not in self.declared.union( + self.locally_declared + ): + self.undeclared.add(ident) + for ident in node.declared_identifiers(): + self.argument_declared.add(ident) + for n in node.nodes: + n.accept_visitor(self) + else: + for ident in node.undeclared_identifiers(): + if ident != "context" and ident not in self.declared.union( + self.locally_declared + ): + self.undeclared.add(ident) + + +_FOR_LOOP = re.compile( + r"^for\s+((?:\(?)\s*[A-Za-z_][A-Za-z_0-9]*" + r"(?:\s*,\s*(?:[A-Za-z_][A-Za-z0-9_]*),??)*\s*(?:\)?))\s+in\s+(.*):" +) + + +def mangle_mako_loop(node, printer): + """converts a for loop into a context manager wrapped around a for loop + when access to the `loop` variable has been detected in the for loop body + """ + loop_variable = LoopVariable() + node.accept_visitor(loop_variable) + if loop_variable.detected: + node.nodes[-1].has_loop_context = True + match = _FOR_LOOP.match(node.text) + if match: + printer.writelines( + "loop = __M_loop._enter(%s)" % match.group(2), + "try:" + # 'with __M_loop(%s) as loop:' % match.group(2) + ) + text = "for %s in loop:" % match.group(1) + else: + raise SyntaxError("Couldn't apply loop context: %s" % node.text) + else: + text = node.text + return text + + +class LoopVariable(object): + + """A node visitor which looks for the name 'loop' within undeclared + identifiers.""" + + def __init__(self): + self.detected = False + + def _loop_reference_detected(self, node): + if "loop" in node.undeclared_identifiers(): + self.detected = True + else: + for n in node.get_children(): + n.accept_visitor(self) + + def visitControlLine(self, node): + self._loop_reference_detected(node) + + def visitCode(self, node): + self._loop_reference_detected(node) + + def visitExpression(self, node): + self._loop_reference_detected(node) |