diff options
Diffstat (limited to 'test/lib/ansible_test/_internal/diff.py')
-rw-r--r-- | test/lib/ansible_test/_internal/diff.py | 256 |
1 files changed, 256 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/diff.py b/test/lib/ansible_test/_internal/diff.py new file mode 100644 index 00000000..1e2038b9 --- /dev/null +++ b/test/lib/ansible_test/_internal/diff.py @@ -0,0 +1,256 @@ +"""Diff parsing functions and classes.""" +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import re +import textwrap +import traceback + +from . import types as t + +from .util import ( + ApplicationError, +) + + +def parse_diff(lines): + """ + :type lines: list[str] + :rtype: list[FileDiff] + """ + return DiffParser(lines).files + + +class FileDiff: + """Parsed diff for a single file.""" + def __init__(self, old_path, new_path): + """ + :type old_path: str + :type new_path: str + """ + self.old = DiffSide(old_path, new=False) + self.new = DiffSide(new_path, new=True) + self.headers = [] # type: t.List[str] + self.binary = False + + def append_header(self, line): + """ + :type line: str + """ + self.headers.append(line) + + @property + def is_complete(self): + """ + :rtype: bool + """ + return self.old.is_complete and self.new.is_complete + + +class DiffSide: + """Parsed diff for a single 'side' of a single file.""" + def __init__(self, path, new): + """ + :type path: str + :type new: bool + """ + self.path = path + self.new = new + self.prefix = '+' if self.new else '-' + self.eof_newline = True + self.exists = True + + self.lines = [] # type: t.List[t.Tuple[int, str]] + self.lines_and_context = [] # type: t.List[t.Tuple[int, str]] + self.ranges = [] # type: t.List[t.Tuple[int, int]] + + self._next_line_number = 0 + self._lines_remaining = 0 + self._range_start = 0 + + def set_start(self, line_start, line_count): + """ + :type line_start: int + :type line_count: int + """ + self._next_line_number = line_start + self._lines_remaining = line_count + self._range_start = 0 + + def append(self, line): + """ + :type line: str + """ + if self._lines_remaining <= 0: + raise Exception('Diff range overflow.') + + entry = self._next_line_number, line + + if line.startswith(' '): + pass + elif line.startswith(self.prefix): + self.lines.append(entry) + + if not self._range_start: + self._range_start = self._next_line_number + else: + raise Exception('Unexpected diff content prefix.') + + self.lines_and_context.append(entry) + + self._lines_remaining -= 1 + + if self._range_start: + if self.is_complete: + range_end = self._next_line_number + elif line.startswith(' '): + range_end = self._next_line_number - 1 + else: + range_end = 0 + + if range_end: + self.ranges.append((self._range_start, range_end)) + self._range_start = 0 + + self._next_line_number += 1 + + @property + def is_complete(self): + """ + :rtype: bool + """ + return self._lines_remaining == 0 + + def format_lines(self, context=True): + """ + :type context: bool + :rtype: list[str] + """ + if context: + lines = self.lines_and_context + else: + lines = self.lines + + return ['%s:%4d %s' % (self.path, line[0], line[1]) for line in lines] + + +class DiffParser: + """Parse diff lines.""" + def __init__(self, lines): + """ + :type lines: list[str] + """ + self.lines = lines + self.files = [] # type: t.List[FileDiff] + + self.action = self.process_start + self.line_number = 0 + self.previous_line = None # type: t.Optional[str] + self.line = None # type: t.Optional[str] + self.file = None # type: t.Optional[FileDiff] + + for self.line in self.lines: + self.line_number += 1 + + try: + self.action() + except Exception as ex: + message = textwrap.dedent(''' + %s + + Line: %d + Previous: %s + Current: %s + %s + ''').strip() % ( + ex, + self.line_number, + self.previous_line or '', + self.line or '', + traceback.format_exc(), + ) + + raise ApplicationError(message.strip()) + + self.previous_line = self.line + + self.complete_file() + + def process_start(self): + """Process a diff start line.""" + self.complete_file() + + match = re.search(r'^diff --git "?a/(?P<old_path>.*)"? "?b/(?P<new_path>.*)"?$', self.line) + + if not match: + raise Exception('Unexpected diff start line.') + + self.file = FileDiff(match.group('old_path'), match.group('new_path')) + self.action = self.process_continue + + def process_range(self): + """Process a diff range line.""" + match = re.search(r'^@@ -((?P<old_start>[0-9]+),)?(?P<old_count>[0-9]+) \+((?P<new_start>[0-9]+),)?(?P<new_count>[0-9]+) @@', self.line) + + if not match: + raise Exception('Unexpected diff range line.') + + self.file.old.set_start(int(match.group('old_start') or 1), int(match.group('old_count'))) + self.file.new.set_start(int(match.group('new_start') or 1), int(match.group('new_count'))) + self.action = self.process_content + + def process_continue(self): + """Process a diff start, range or header line.""" + if self.line.startswith('diff '): + self.process_start() + elif self.line.startswith('@@ '): + self.process_range() + else: + self.process_header() + + def process_header(self): + """Process a diff header line.""" + if self.line.startswith('Binary files '): + self.file.binary = True + elif self.line == '--- /dev/null': + self.file.old.exists = False + elif self.line == '+++ /dev/null': + self.file.new.exists = False + else: + self.file.append_header(self.line) + + def process_content(self): + """Process a diff content line.""" + if self.line == r'\ No newline at end of file': + if self.previous_line.startswith(' '): + self.file.old.eof_newline = False + self.file.new.eof_newline = False + elif self.previous_line.startswith('-'): + self.file.old.eof_newline = False + elif self.previous_line.startswith('+'): + self.file.new.eof_newline = False + else: + raise Exception('Unexpected previous diff content line.') + + return + + if self.file.is_complete: + self.process_continue() + return + + if self.line.startswith(' '): + self.file.old.append(self.line) + self.file.new.append(self.line) + elif self.line.startswith('-'): + self.file.old.append(self.line) + elif self.line.startswith('+'): + self.file.new.append(self.line) + else: + raise Exception('Unexpected diff content line.') + + def complete_file(self): + """Complete processing of the current file, if any.""" + if not self.file: + return + + self.files.append(self.file) |