From 7b6e527f440cd7e6f8be2b07cee320ee6ca18786 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 29 Apr 2024 06:41:38 +0200 Subject: Adding upstream version 1.0.1. Signed-off-by: Daniel Baumann --- mesonbuild/rewriter.py | 1067 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1067 insertions(+) create mode 100644 mesonbuild/rewriter.py (limited to 'mesonbuild/rewriter.py') diff --git a/mesonbuild/rewriter.py b/mesonbuild/rewriter.py new file mode 100644 index 0000000..1497d93 --- /dev/null +++ b/mesonbuild/rewriter.py @@ -0,0 +1,1067 @@ +#!/usr/bin/env python3 +# Copyright 2016 The Meson development team + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This class contains the basic functionality needed to run any interpreter +# or an interpreter-based tool. + +# This tool is used to manipulate an existing Meson build definition. +# +# - add a file to a target +# - remove files from a target +# - move targets +# - reindent? +from __future__ import annotations + +from .ast import IntrospectionInterpreter, BUILD_TARGET_FUNCTIONS, AstConditionLevel, AstIDGenerator, AstIndentationGenerator, AstPrinter +from mesonbuild.mesonlib import MesonException +from . import mlog, environment +from functools import wraps +from .mparser import Token, ArrayNode, ArgumentNode, AssignmentNode, BooleanNode, ElementaryNode, IdNode, FunctionNode, StringNode +import json, os, re, sys +import typing as T + +if T.TYPE_CHECKING: + from .mparser import BaseNode + +class RewriterException(MesonException): + pass + +def add_arguments(parser, formatter=None): + parser.add_argument('-s', '--sourcedir', type=str, default='.', metavar='SRCDIR', help='Path to source directory.') + parser.add_argument('-V', '--verbose', action='store_true', default=False, help='Enable verbose output') + parser.add_argument('-S', '--skip-errors', dest='skip', action='store_true', default=False, help='Skip errors instead of aborting') + subparsers = parser.add_subparsers(dest='type', title='Rewriter commands', description='Rewrite command to execute') + + # Target + tgt_parser = subparsers.add_parser('target', help='Modify a target', formatter_class=formatter) + tgt_parser.add_argument('-s', '--subdir', default='', dest='subdir', help='Subdirectory of the new target (only for the "add_target" action)') + tgt_parser.add_argument('--type', dest='tgt_type', choices=rewriter_keys['target']['target_type'][2], default='executable', + help='Type of the target to add (only for the "add_target" action)') + tgt_parser.add_argument('target', help='Name or ID of the target') + tgt_parser.add_argument('operation', choices=['add', 'rm', 'add_target', 'rm_target', 'add_extra_files', 'rm_extra_files', 'info'], + help='Action to execute') + tgt_parser.add_argument('sources', nargs='*', help='Sources to add/remove') + + # KWARGS + kw_parser = subparsers.add_parser('kwargs', help='Modify keyword arguments', formatter_class=formatter) + kw_parser.add_argument('operation', choices=rewriter_keys['kwargs']['operation'][2], + help='Action to execute') + kw_parser.add_argument('function', choices=list(rewriter_func_kwargs.keys()), + help='Function type to modify') + kw_parser.add_argument('id', help='ID of the function to modify (can be anything for "project")') + kw_parser.add_argument('kwargs', nargs='*', help='Pairs of keyword and value') + + # Default options + def_parser = subparsers.add_parser('default-options', help='Modify the project default options', formatter_class=formatter) + def_parser.add_argument('operation', choices=rewriter_keys['default_options']['operation'][2], + help='Action to execute') + def_parser.add_argument('options', nargs='*', help='Key, value pairs of configuration option') + + # JSON file/command + cmd_parser = subparsers.add_parser('command', help='Execute a JSON array of commands', formatter_class=formatter) + cmd_parser.add_argument('json', help='JSON string or file to execute') + +class RequiredKeys: + def __init__(self, keys): + self.keys = keys + + def __call__(self, f): + @wraps(f) + def wrapped(*wrapped_args, **wrapped_kwargs): + assert len(wrapped_args) >= 2 + cmd = wrapped_args[1] + for key, val in self.keys.items(): + typ = val[0] # The type of the value + default = val[1] # The default value -- None is required + choices = val[2] # Valid choices -- None is for everything + if key not in cmd: + if default is not None: + cmd[key] = default + else: + raise RewriterException('Key "{}" is missing in object for {}' + .format(key, f.__name__)) + if not isinstance(cmd[key], typ): + raise RewriterException('Invalid type of "{}". Required is {} but provided was {}' + .format(key, typ.__name__, type(cmd[key]).__name__)) + if choices is not None: + assert isinstance(choices, list) + if cmd[key] not in choices: + raise RewriterException('Invalid value of "{}": Possible values are {} but provided was "{}"' + .format(key, choices, cmd[key])) + return f(*wrapped_args, **wrapped_kwargs) + + return wrapped + +class MTypeBase: + def __init__(self, node: T.Optional[BaseNode] = None): + if node is None: + self.node = self._new_node() # lgtm [py/init-calls-subclass] (node creation does not depend on base class state) + else: + self.node = node + self.node_type = None + for i in self.supported_nodes(): # lgtm [py/init-calls-subclass] (listing nodes does not depend on base class state) + if isinstance(self.node, i): + self.node_type = i + + def _new_node(self): + # Overwrite in derived class + raise RewriterException('Internal error: _new_node of MTypeBase was called') + + def can_modify(self): + return self.node_type is not None + + def get_node(self): + return self.node + + def supported_nodes(self): + # Overwrite in derived class + return [] + + def set_value(self, value): + # Overwrite in derived class + mlog.warning('Cannot set the value of type', mlog.bold(type(self).__name__), '--> skipping') + + def add_value(self, value): + # Overwrite in derived class + mlog.warning('Cannot add a value of type', mlog.bold(type(self).__name__), '--> skipping') + + def remove_value(self, value): + # Overwrite in derived class + mlog.warning('Cannot remove a value of type', mlog.bold(type(self).__name__), '--> skipping') + + def remove_regex(self, value): + # Overwrite in derived class + mlog.warning('Cannot remove a regex in type', mlog.bold(type(self).__name__), '--> skipping') + +class MTypeStr(MTypeBase): + def __init__(self, node: T.Optional[BaseNode] = None): + super().__init__(node) + + def _new_node(self): + return StringNode(Token('', '', 0, 0, 0, None, '')) + + def supported_nodes(self): + return [StringNode] + + def set_value(self, value): + self.node.value = str(value) + +class MTypeBool(MTypeBase): + def __init__(self, node: T.Optional[BaseNode] = None): + super().__init__(node) + + def _new_node(self): + return BooleanNode(Token('', '', 0, 0, 0, None, False)) + + def supported_nodes(self): + return [BooleanNode] + + def set_value(self, value): + self.node.value = bool(value) + +class MTypeID(MTypeBase): + def __init__(self, node: T.Optional[BaseNode] = None): + super().__init__(node) + + def _new_node(self): + return IdNode(Token('', '', 0, 0, 0, None, '')) + + def supported_nodes(self): + return [IdNode] + + def set_value(self, value): + self.node.value = str(value) + +class MTypeList(MTypeBase): + def __init__(self, node: T.Optional[BaseNode] = None): + super().__init__(node) + + def _new_node(self): + return ArrayNode(ArgumentNode(Token('', '', 0, 0, 0, None, '')), 0, 0, 0, 0) + + def _new_element_node(self, value): + # Overwrite in derived class + raise RewriterException('Internal error: _new_element_node of MTypeList was called') + + def _ensure_array_node(self): + if not isinstance(self.node, ArrayNode): + tmp = self.node + self.node = self._new_node() + self.node.args.arguments += [tmp] + + def _check_is_equal(self, node, value) -> bool: + # Overwrite in derived class + return False + + def _check_regex_matches(self, node, regex: str) -> bool: + # Overwrite in derived class + return False + + def get_node(self): + if isinstance(self.node, ArrayNode): + if len(self.node.args.arguments) == 1: + return self.node.args.arguments[0] + return self.node + + def supported_element_nodes(self): + # Overwrite in derived class + return [] + + def supported_nodes(self): + return [ArrayNode] + self.supported_element_nodes() + + def set_value(self, value): + if not isinstance(value, list): + value = [value] + self._ensure_array_node() + self.node.args.arguments = [] # Remove all current nodes + for i in value: + self.node.args.arguments += [self._new_element_node(i)] + + def add_value(self, value): + if not isinstance(value, list): + value = [value] + self._ensure_array_node() + for i in value: + self.node.args.arguments += [self._new_element_node(i)] + + def _remove_helper(self, value, equal_func): + def check_remove_node(node): + for j in value: + if equal_func(i, j): + return True + return False + + if not isinstance(value, list): + value = [value] + self._ensure_array_node() + removed_list = [] + for i in self.node.args.arguments: + if not check_remove_node(i): + removed_list += [i] + self.node.args.arguments = removed_list + + def remove_value(self, value): + self._remove_helper(value, self._check_is_equal) + + def remove_regex(self, regex: str): + self._remove_helper(regex, self._check_regex_matches) + +class MTypeStrList(MTypeList): + def __init__(self, node: T.Optional[BaseNode] = None): + super().__init__(node) + + def _new_element_node(self, value): + return StringNode(Token('', '', 0, 0, 0, None, str(value))) + + def _check_is_equal(self, node, value) -> bool: + if isinstance(node, StringNode): + return node.value == value + return False + + def _check_regex_matches(self, node, regex: str) -> bool: + if isinstance(node, StringNode): + return re.match(regex, node.value) is not None + return False + + def supported_element_nodes(self): + return [StringNode] + +class MTypeIDList(MTypeList): + def __init__(self, node: T.Optional[BaseNode] = None): + super().__init__(node) + + def _new_element_node(self, value): + return IdNode(Token('', '', 0, 0, 0, None, str(value))) + + def _check_is_equal(self, node, value) -> bool: + if isinstance(node, IdNode): + return node.value == value + return False + + def _check_regex_matches(self, node, regex: str) -> bool: + if isinstance(node, StringNode): + return re.match(regex, node.value) is not None + return False + + def supported_element_nodes(self): + return [IdNode] + +rewriter_keys = { + 'default_options': { + 'operation': (str, None, ['set', 'delete']), + 'options': (dict, {}, None) + }, + 'kwargs': { + 'function': (str, None, None), + 'id': (str, None, None), + 'operation': (str, None, ['set', 'delete', 'add', 'remove', 'remove_regex', 'info']), + 'kwargs': (dict, {}, None) + }, + 'target': { + 'target': (str, None, None), + 'operation': (str, None, ['src_add', 'src_rm', 'target_rm', 'target_add', 'extra_files_add', 'extra_files_rm', 'info']), + 'sources': (list, [], None), + 'subdir': (str, '', None), + 'target_type': (str, 'executable', ['both_libraries', 'executable', 'jar', 'library', 'shared_library', 'shared_module', 'static_library']), + } +} + +rewriter_func_kwargs = { + 'dependency': { + 'language': MTypeStr, + 'method': MTypeStr, + 'native': MTypeBool, + 'not_found_message': MTypeStr, + 'required': MTypeBool, + 'static': MTypeBool, + 'version': MTypeStrList, + 'modules': MTypeStrList + }, + 'target': { + 'build_by_default': MTypeBool, + 'build_rpath': MTypeStr, + 'dependencies': MTypeIDList, + 'gui_app': MTypeBool, + 'link_with': MTypeIDList, + 'export_dynamic': MTypeBool, + 'implib': MTypeBool, + 'install': MTypeBool, + 'install_dir': MTypeStr, + 'install_rpath': MTypeStr, + 'pie': MTypeBool + }, + 'project': { + 'default_options': MTypeStrList, + 'meson_version': MTypeStr, + 'license': MTypeStrList, + 'subproject_dir': MTypeStr, + 'version': MTypeStr + } +} + +class Rewriter: + def __init__(self, sourcedir: str, generator: str = 'ninja', skip_errors: bool = False): + self.sourcedir = sourcedir + self.interpreter = IntrospectionInterpreter(sourcedir, '', generator, visitors = [AstIDGenerator(), AstIndentationGenerator(), AstConditionLevel()]) + self.skip_errors = skip_errors + self.modified_nodes = [] + self.to_remove_nodes = [] + self.to_add_nodes = [] + self.functions = { + 'default_options': self.process_default_options, + 'kwargs': self.process_kwargs, + 'target': self.process_target, + } + self.info_dump = None + + def analyze_meson(self): + mlog.log('Analyzing meson file:', mlog.bold(os.path.join(self.sourcedir, environment.build_filename))) + self.interpreter.analyze() + mlog.log(' -- Project:', mlog.bold(self.interpreter.project_data['descriptive_name'])) + mlog.log(' -- Version:', mlog.cyan(self.interpreter.project_data['version'])) + + def add_info(self, cmd_type: str, cmd_id: str, data: dict): + if self.info_dump is None: + self.info_dump = {} + if cmd_type not in self.info_dump: + self.info_dump[cmd_type] = {} + self.info_dump[cmd_type][cmd_id] = data + + def print_info(self): + if self.info_dump is None: + return + sys.stderr.write(json.dumps(self.info_dump, indent=2)) + + def on_error(self): + if self.skip_errors: + return mlog.cyan('-->'), mlog.yellow('skipping') + return mlog.cyan('-->'), mlog.red('aborting') + + def handle_error(self): + if self.skip_errors: + return None + raise MesonException('Rewriting the meson.build failed') + + def find_target(self, target: str): + def check_list(name: str) -> T.List[BaseNode]: + result = [] + for i in self.interpreter.targets: + if name in {i['name'], i['id']}: + result += [i] + return result + + targets = check_list(target) + if targets: + if len(targets) == 1: + return targets[0] + else: + mlog.error('There are multiple targets matching', mlog.bold(target)) + for i in targets: + mlog.error(' -- Target name', mlog.bold(i['name']), 'with ID', mlog.bold(i['id'])) + mlog.error('Please try again with the unique ID of the target', *self.on_error()) + self.handle_error() + return None + + # Check the assignments + tgt = None + if target in self.interpreter.assignments: + node = self.interpreter.assignments[target] + if isinstance(node, FunctionNode): + if node.func_name in {'executable', 'jar', 'library', 'shared_library', 'shared_module', 'static_library', 'both_libraries'}: + tgt = self.interpreter.assign_vals[target] + + return tgt + + def find_dependency(self, dependency: str): + def check_list(name: str): + for i in self.interpreter.dependencies: + if name == i['name']: + return i + return None + + dep = check_list(dependency) + if dep is not None: + return dep + + # Check the assignments + if dependency in self.interpreter.assignments: + node = self.interpreter.assignments[dependency] + if isinstance(node, FunctionNode): + if node.func_name == 'dependency': + name = self.interpreter.flatten_args(node.args)[0] + dep = check_list(name) + + return dep + + @RequiredKeys(rewriter_keys['default_options']) + def process_default_options(self, cmd): + # First, remove the old values + kwargs_cmd = { + 'function': 'project', + 'id': "/", + 'operation': 'remove_regex', + 'kwargs': { + 'default_options': [f'{x}=.*' for x in cmd['options'].keys()] + } + } + self.process_kwargs(kwargs_cmd) + + # Then add the new values + if cmd['operation'] != 'set': + return + + kwargs_cmd['operation'] = 'add' + kwargs_cmd['kwargs']['default_options'] = [] + + cdata = self.interpreter.coredata + options = { + **{str(k): v for k, v in cdata.options.items()}, + **{str(k): v for k, v in cdata.options.items()}, + **{str(k): v for k, v in cdata.options.items()}, + **{str(k): v for k, v in cdata.options.items()}, + **{str(k): v for k, v in cdata.options.items()}, + } + + for key, val in sorted(cmd['options'].items()): + if key not in options: + mlog.error('Unknown options', mlog.bold(key), *self.on_error()) + self.handle_error() + continue + + try: + val = options[key].validate_value(val) + except MesonException as e: + mlog.error('Unable to set', mlog.bold(key), mlog.red(str(e)), *self.on_error()) + self.handle_error() + continue + + kwargs_cmd['kwargs']['default_options'] += [f'{key}={val}'] + + self.process_kwargs(kwargs_cmd) + + @RequiredKeys(rewriter_keys['kwargs']) + def process_kwargs(self, cmd): + mlog.log('Processing function type', mlog.bold(cmd['function']), 'with id', mlog.cyan("'" + cmd['id'] + "'")) + if cmd['function'] not in rewriter_func_kwargs: + mlog.error('Unknown function type', cmd['function'], *self.on_error()) + return self.handle_error() + kwargs_def = rewriter_func_kwargs[cmd['function']] + + # Find the function node to modify + node = None + arg_node = None + if cmd['function'] == 'project': + # msys bash may expand '/' to a path. It will mangle '//' to '/' + # but in order to keep usage shell-agnostic, also allow `//` as + # the function ID such that it will work in both msys bash and + # other shells. + if {'/', '//'}.isdisjoint({cmd['id']}): + mlog.error('The ID for the function type project must be "/" or "//" not "' + cmd['id'] + '"', *self.on_error()) + return self.handle_error() + node = self.interpreter.project_node + arg_node = node.args + elif cmd['function'] == 'target': + tmp = self.find_target(cmd['id']) + if tmp: + node = tmp['node'] + arg_node = node.args + elif cmd['function'] == 'dependency': + tmp = self.find_dependency(cmd['id']) + if tmp: + node = tmp['node'] + arg_node = node.args + if not node: + mlog.error('Unable to find the function node') + assert isinstance(node, FunctionNode) + assert isinstance(arg_node, ArgumentNode) + # Transform the key nodes to plain strings + arg_node.kwargs = {k.value: v for k, v in arg_node.kwargs.items()} + + # Print kwargs info + if cmd['operation'] == 'info': + info_data = {} + for key, val in sorted(arg_node.kwargs.items()): + info_data[key] = None + if isinstance(val, ElementaryNode): + info_data[key] = val.value + elif isinstance(val, ArrayNode): + data_list = [] + for i in val.args.arguments: + element = None + if isinstance(i, ElementaryNode): + element = i.value + data_list += [element] + info_data[key] = data_list + + self.add_info('kwargs', '{}#{}'.format(cmd['function'], cmd['id']), info_data) + return # Nothing else to do + + # Modify the kwargs + num_changed = 0 + for key, val in sorted(cmd['kwargs'].items()): + if key not in kwargs_def: + mlog.error('Cannot modify unknown kwarg', mlog.bold(key), *self.on_error()) + self.handle_error() + continue + + # Remove the key from the kwargs + if cmd['operation'] == 'delete': + if key in arg_node.kwargs: + mlog.log(' -- Deleting', mlog.bold(key), 'from the kwargs') + del arg_node.kwargs[key] + num_changed += 1 + else: + mlog.log(' -- Key', mlog.bold(key), 'is already deleted') + continue + + if key not in arg_node.kwargs: + arg_node.kwargs[key] = None + modifyer = kwargs_def[key](arg_node.kwargs[key]) + if not modifyer.can_modify(): + mlog.log(' -- Skipping', mlog.bold(key), 'because it is to complex to modify') + + # Apply the operation + val_str = str(val) + if cmd['operation'] == 'set': + mlog.log(' -- Setting', mlog.bold(key), 'to', mlog.yellow(val_str)) + modifyer.set_value(val) + elif cmd['operation'] == 'add': + mlog.log(' -- Adding', mlog.yellow(val_str), 'to', mlog.bold(key)) + modifyer.add_value(val) + elif cmd['operation'] == 'remove': + mlog.log(' -- Removing', mlog.yellow(val_str), 'from', mlog.bold(key)) + modifyer.remove_value(val) + elif cmd['operation'] == 'remove_regex': + mlog.log(' -- Removing all values matching', mlog.yellow(val_str), 'from', mlog.bold(key)) + modifyer.remove_regex(val) + + # Write back the result + arg_node.kwargs[key] = modifyer.get_node() + num_changed += 1 + + # Convert the keys back to IdNode's + arg_node.kwargs = {IdNode(Token('', '', 0, 0, 0, None, k)): v for k, v in arg_node.kwargs.items()} + if num_changed > 0 and node not in self.modified_nodes: + self.modified_nodes += [node] + + def find_assignment_node(self, node: BaseNode) -> AssignmentNode: + if node.ast_id and node.ast_id in self.interpreter.reverse_assignment: + return self.interpreter.reverse_assignment[node.ast_id] + return None + + @RequiredKeys(rewriter_keys['target']) + def process_target(self, cmd): + mlog.log('Processing target', mlog.bold(cmd['target']), 'operation', mlog.cyan(cmd['operation'])) + target = self.find_target(cmd['target']) + if target is None and cmd['operation'] != 'target_add': + mlog.error('Unknown target', mlog.bold(cmd['target']), *self.on_error()) + return self.handle_error() + + # Make source paths relative to the current subdir + def rel_source(src: str) -> str: + subdir = os.path.abspath(os.path.join(self.sourcedir, target['subdir'])) + if os.path.isabs(src): + return os.path.relpath(src, subdir) + elif not os.path.exists(src): + return src # Trust the user when the source doesn't exist + # Make sure that the path is relative to the subdir + return os.path.relpath(os.path.abspath(src), subdir) + + if target is not None: + cmd['sources'] = [rel_source(x) for x in cmd['sources']] + + # Utility function to get a list of the sources from a node + def arg_list_from_node(n): + args = [] + if isinstance(n, FunctionNode): + args = list(n.args.arguments) + if n.func_name in BUILD_TARGET_FUNCTIONS: + args.pop(0) + elif isinstance(n, ArrayNode): + args = n.args.arguments + elif isinstance(n, ArgumentNode): + args = n.arguments + return args + + to_sort_nodes = [] + + if cmd['operation'] == 'src_add': + node = None + if target['sources']: + node = target['sources'][0] + else: + node = target['node'] + assert node is not None + + # Generate the current source list + src_list = [] + for i in target['sources']: + for j in arg_list_from_node(i): + if isinstance(j, StringNode): + src_list += [j.value] + + # Generate the new String nodes + to_append = [] + for i in sorted(set(cmd['sources'])): + if i in src_list: + mlog.log(' -- Source', mlog.green(i), 'is already defined for the target --> skipping') + continue + mlog.log(' -- Adding source', mlog.green(i), 'at', + mlog.yellow(f'{node.filename}:{node.lineno}')) + token = Token('string', node.filename, 0, 0, 0, None, i) + to_append += [StringNode(token)] + + # Append to the AST at the right place + arg_node = None + if isinstance(node, (FunctionNode, ArrayNode)): + arg_node = node.args + elif isinstance(node, ArgumentNode): + arg_node = node + assert arg_node is not None + arg_node.arguments += to_append + + # Mark the node as modified + if arg_node not in to_sort_nodes and not isinstance(node, FunctionNode): + to_sort_nodes += [arg_node] + if node not in self.modified_nodes: + self.modified_nodes += [node] + + elif cmd['operation'] == 'src_rm': + # Helper to find the exact string node and its parent + def find_node(src): + for i in target['sources']: + for j in arg_list_from_node(i): + if isinstance(j, StringNode): + if j.value == src: + return i, j + return None, None + + for i in cmd['sources']: + # Try to find the node with the source string + root, string_node = find_node(i) + if root is None: + mlog.warning(' -- Unable to find source', mlog.green(i), 'in the target') + continue + + # Remove the found string node from the argument list + arg_node = None + if isinstance(root, (FunctionNode, ArrayNode)): + arg_node = root.args + elif isinstance(root, ArgumentNode): + arg_node = root + assert arg_node is not None + mlog.log(' -- Removing source', mlog.green(i), 'from', + mlog.yellow(f'{string_node.filename}:{string_node.lineno}')) + arg_node.arguments.remove(string_node) + + # Mark the node as modified + if arg_node not in to_sort_nodes and not isinstance(root, FunctionNode): + to_sort_nodes += [arg_node] + if root not in self.modified_nodes: + self.modified_nodes += [root] + + elif cmd['operation'] == 'extra_files_add': + tgt_function: FunctionNode = target['node'] + mark_array = True + try: + node = target['extra_files'][0] + except IndexError: + # Specifying `extra_files` with a list that flattens to empty gives an empty + # target['extra_files'] list, account for that. + try: + extra_files_key = next(k for k in tgt_function.args.kwargs.keys() if isinstance(k, IdNode) and k.value == 'extra_files') + node = tgt_function.args.kwargs[extra_files_key] + except StopIteration: + # Target has no extra_files kwarg, create one + node = ArrayNode(ArgumentNode(Token('', tgt_function.filename, 0, 0, 0, None, '[]')), tgt_function.end_lineno, tgt_function.end_colno, tgt_function.end_lineno, tgt_function.end_colno) + tgt_function.args.kwargs[IdNode(Token('string', tgt_function.filename, 0, 0, 0, None, 'extra_files'))] = node + mark_array = False + if tgt_function not in self.modified_nodes: + self.modified_nodes += [tgt_function] + target['extra_files'] = [node] + if isinstance(node, IdNode): + node = self.interpreter.assignments[node.value] + target['extra_files'] = [node] + if not isinstance(node, ArrayNode): + mlog.error('Target', mlog.bold(cmd['target']), 'extra_files argument must be a list', *self.on_error()) + return self.handle_error() + + # Generate the current extra files list + extra_files_list = [] + for i in target['extra_files']: + for j in arg_list_from_node(i): + if isinstance(j, StringNode): + extra_files_list += [j.value] + + # Generate the new String nodes + to_append = [] + for i in sorted(set(cmd['sources'])): + if i in extra_files_list: + mlog.log(' -- Extra file', mlog.green(i), 'is already defined for the target --> skipping') + continue + mlog.log(' -- Adding extra file', mlog.green(i), 'at', + mlog.yellow(f'{node.filename}:{node.lineno}')) + token = Token('string', node.filename, 0, 0, 0, None, i) + to_append += [StringNode(token)] + + # Append to the AST at the right place + arg_node = node.args + arg_node.arguments += to_append + + # Mark the node as modified + if arg_node not in to_sort_nodes: + to_sort_nodes += [arg_node] + # If the extra_files array is newly created, don't mark it as its parent function node already is, + # otherwise this would cause double modification. + if mark_array and node not in self.modified_nodes: + self.modified_nodes += [node] + + elif cmd['operation'] == 'extra_files_rm': + # Helper to find the exact string node and its parent + def find_node(src): + for i in target['extra_files']: + for j in arg_list_from_node(i): + if isinstance(j, StringNode): + if j.value == src: + return i, j + return None, None + + for i in cmd['sources']: + # Try to find the node with the source string + root, string_node = find_node(i) + if root is None: + mlog.warning(' -- Unable to find extra file', mlog.green(i), 'in the target') + continue + + # Remove the found string node from the argument list + arg_node = root.args + mlog.log(' -- Removing extra file', mlog.green(i), 'from', + mlog.yellow(f'{string_node.filename}:{string_node.lineno}')) + arg_node.arguments.remove(string_node) + + # Mark the node as modified + if arg_node not in to_sort_nodes and not isinstance(root, FunctionNode): + to_sort_nodes += [arg_node] + if root not in self.modified_nodes: + self.modified_nodes += [root] + + elif cmd['operation'] == 'target_add': + if target is not None: + mlog.error('Can not add target', mlog.bold(cmd['target']), 'because it already exists', *self.on_error()) + return self.handle_error() + + id_base = re.sub(r'[- ]', '_', cmd['target']) + target_id = id_base + '_exe' if cmd['target_type'] == 'executable' else '_lib' + source_id = id_base + '_sources' + filename = os.path.join(cmd['subdir'], environment.build_filename) + + # Build src list + src_arg_node = ArgumentNode(Token('string', filename, 0, 0, 0, None, '')) + src_arr_node = ArrayNode(src_arg_node, 0, 0, 0, 0) + src_far_node = ArgumentNode(Token('string', filename, 0, 0, 0, None, '')) + src_fun_node = FunctionNode(filename, 0, 0, 0, 0, 'files', src_far_node) + src_ass_node = AssignmentNode(filename, 0, 0, source_id, src_fun_node) + src_arg_node.arguments = [StringNode(Token('string', filename, 0, 0, 0, None, x)) for x in cmd['sources']] + src_far_node.arguments = [src_arr_node] + + # Build target + tgt_arg_node = ArgumentNode(Token('string', filename, 0, 0, 0, None, '')) + tgt_fun_node = FunctionNode(filename, 0, 0, 0, 0, cmd['target_type'], tgt_arg_node) + tgt_ass_node = AssignmentNode(filename, 0, 0, target_id, tgt_fun_node) + tgt_arg_node.arguments = [ + StringNode(Token('string', filename, 0, 0, 0, None, cmd['target'])), + IdNode(Token('string', filename, 0, 0, 0, None, source_id)) + ] + + src_ass_node.accept(AstIndentationGenerator()) + tgt_ass_node.accept(AstIndentationGenerator()) + self.to_add_nodes += [src_ass_node, tgt_ass_node] + + elif cmd['operation'] == 'target_rm': + to_remove = self.find_assignment_node(target['node']) + if to_remove is None: + to_remove = target['node'] + self.to_remove_nodes += [to_remove] + mlog.log(' -- Removing target', mlog.green(cmd['target']), 'at', + mlog.yellow(f'{to_remove.filename}:{to_remove.lineno}')) + + elif cmd['operation'] == 'info': + # T.List all sources in the target + src_list = [] + for i in target['sources']: + for j in arg_list_from_node(i): + if isinstance(j, StringNode): + src_list += [j.value] + extra_files_list = [] + for i in target['extra_files']: + for j in arg_list_from_node(i): + if isinstance(j, StringNode): + extra_files_list += [j.value] + test_data = { + 'name': target['name'], + 'sources': src_list, + 'extra_files': extra_files_list + } + self.add_info('target', target['id'], test_data) + + # Sort files + for i in to_sort_nodes: + convert = lambda text: int(text) if text.isdigit() else text.lower() + alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)] + path_sorter = lambda key: ([(key.count('/') <= idx, alphanum_key(x)) for idx, x in enumerate(key.split('/'))]) + + unknown = [x for x in i.arguments if not isinstance(x, StringNode)] + sources = [x for x in i.arguments if isinstance(x, StringNode)] + sources = sorted(sources, key=lambda x: path_sorter(x.value)) + i.arguments = unknown + sources + + def process(self, cmd): + if 'type' not in cmd: + raise RewriterException('Command has no key "type"') + if cmd['type'] not in self.functions: + raise RewriterException('Unknown command "{}". Supported commands are: {}' + .format(cmd['type'], list(self.functions.keys()))) + self.functions[cmd['type']](cmd) + + def apply_changes(self): + assert all(hasattr(x, 'lineno') and hasattr(x, 'colno') and hasattr(x, 'filename') for x in self.modified_nodes) + assert all(hasattr(x, 'lineno') and hasattr(x, 'colno') and hasattr(x, 'filename') for x in self.to_remove_nodes) + assert all(isinstance(x, (ArrayNode, FunctionNode)) for x in self.modified_nodes) + assert all(isinstance(x, (ArrayNode, AssignmentNode, FunctionNode)) for x in self.to_remove_nodes) + # Sort based on line and column in reversed order + work_nodes = [{'node': x, 'action': 'modify'} for x in self.modified_nodes] + work_nodes += [{'node': x, 'action': 'rm'} for x in self.to_remove_nodes] + work_nodes = sorted(work_nodes, key=lambda x: (x['node'].lineno, x['node'].colno), reverse=True) + work_nodes += [{'node': x, 'action': 'add'} for x in self.to_add_nodes] + + # Generating the new replacement string + str_list = [] + for i in work_nodes: + new_data = '' + if i['action'] == 'modify' or i['action'] == 'add': + printer = AstPrinter() + i['node'].accept(printer) + printer.post_process() + new_data = printer.result.strip() + data = { + 'file': i['node'].filename, + 'str': new_data, + 'node': i['node'], + 'action': i['action'] + } + str_list += [data] + + # Load build files + files = {} + for i in str_list: + if i['file'] in files: + continue + fpath = os.path.realpath(os.path.join(self.sourcedir, i['file'])) + fdata = '' + # Create an empty file if it does not exist + if not os.path.exists(fpath): + with open(fpath, 'w', encoding='utf-8'): + pass + with open(fpath, encoding='utf-8') as fp: + fdata = fp.read() + + # Generate line offsets numbers + m_lines = fdata.splitlines(True) + offset = 0 + line_offsets = [] + for j in m_lines: + line_offsets += [offset] + offset += len(j) + + files[i['file']] = { + 'path': fpath, + 'raw': fdata, + 'offsets': line_offsets + } + + # Replace in source code + def remove_node(i): + offsets = files[i['file']]['offsets'] + raw = files[i['file']]['raw'] + node = i['node'] + line = node.lineno - 1 + col = node.colno + start = offsets[line] + col + end = start + if isinstance(node, (ArrayNode, FunctionNode)): + end = offsets[node.end_lineno - 1] + node.end_colno + + # Only removal is supported for assignments + elif isinstance(node, AssignmentNode) and i['action'] == 'rm': + if isinstance(node.value, (ArrayNode, FunctionNode)): + remove_node({'file': i['file'], 'str': '', 'node': node.value, 'action': 'rm'}) + raw = files[i['file']]['raw'] + while raw[end] != '=': + end += 1 + end += 1 # Handle the '=' + while raw[end] in {' ', '\n', '\t'}: + end += 1 + + files[i['file']]['raw'] = raw[:start] + i['str'] + raw[end:] + + for i in str_list: + if i['action'] in {'modify', 'rm'}: + remove_node(i) + elif i['action'] == 'add': + files[i['file']]['raw'] += i['str'] + '\n' + + # Write the files back + for key, val in files.items(): + mlog.log('Rewriting', mlog.yellow(key)) + with open(val['path'], 'w', encoding='utf-8') as fp: + fp.write(val['raw']) + +target_operation_map = { + 'add': 'src_add', + 'rm': 'src_rm', + 'add_target': 'target_add', + 'rm_target': 'target_rm', + 'add_extra_files': 'extra_files_add', + 'rm_extra_files': 'extra_files_rm', + 'info': 'info', +} + +def list_to_dict(in_list: T.List[str]) -> T.Dict[str, str]: + result = {} + it = iter(in_list) + try: + for i in it: + # calling next(it) is not a mistake, we're taking the next element from + # the iterator, avoiding the need to preprocess it into a sequence of + # key value pairs. + result[i] = next(it) + except StopIteration: + raise TypeError('in_list parameter of list_to_dict must have an even length.') + return result + +def generate_target(options) -> T.List[dict]: + return [{ + 'type': 'target', + 'target': options.target, + 'operation': target_operation_map[options.operation], + 'sources': options.sources, + 'subdir': options.subdir, + 'target_type': options.tgt_type, + }] + +def generate_kwargs(options) -> T.List[dict]: + return [{ + 'type': 'kwargs', + 'function': options.function, + 'id': options.id, + 'operation': options.operation, + 'kwargs': list_to_dict(options.kwargs), + }] + +def generate_def_opts(options) -> T.List[dict]: + return [{ + 'type': 'default_options', + 'operation': options.operation, + 'options': list_to_dict(options.options), + }] + +def generate_cmd(options) -> T.List[dict]: + if os.path.exists(options.json): + with open(options.json, encoding='utf-8') as fp: + return json.load(fp) + else: + return json.loads(options.json) + +# Map options.type to the actual type name +cli_type_map = { + 'target': generate_target, + 'tgt': generate_target, + 'kwargs': generate_kwargs, + 'default-options': generate_def_opts, + 'def': generate_def_opts, + 'command': generate_cmd, + 'cmd': generate_cmd, +} + +def run(options): + if not options.verbose: + mlog.set_quiet() + + try: + rewriter = Rewriter(options.sourcedir, skip_errors=options.skip) + rewriter.analyze_meson() + + if options.type is None: + mlog.error('No command specified') + return 1 + + commands = cli_type_map[options.type](options) + + if not isinstance(commands, list): + raise TypeError('Command is not a list') + + for i in commands: + if not isinstance(i, object): + raise TypeError('Command is not an object') + rewriter.process(i) + + rewriter.apply_changes() + rewriter.print_info() + return 0 + except Exception as e: + raise e + finally: + mlog.set_verbose() -- cgit v1.2.3