diff options
Diffstat (limited to 'test/lib/ansible_test/_internal/diff.py')
-rw-r--r-- | test/lib/ansible_test/_internal/diff.py | 226 |
1 files changed, 226 insertions, 0 deletions
diff --git a/test/lib/ansible_test/_internal/diff.py b/test/lib/ansible_test/_internal/diff.py new file mode 100644 index 0000000..edaf6c5 --- /dev/null +++ b/test/lib/ansible_test/_internal/diff.py @@ -0,0 +1,226 @@ +"""Diff parsing functions and classes.""" +from __future__ import annotations + +import re +import textwrap +import traceback +import typing as t + +from .util import ( + ApplicationError, +) + + +def parse_diff(lines: list[str]) -> list[FileDiff]: + """Parse the given diff lines and return a list of FileDiff objects representing the changes of each file.""" + return DiffParser(lines).files + + +class FileDiff: + """Parsed diff for a single file.""" + def __init__(self, old_path: str, new_path: str) -> None: + self.old = DiffSide(old_path, new=False) + self.new = DiffSide(new_path, new=True) + self.headers: list[str] = [] + self.binary = False + + def append_header(self, line: str) -> None: + """Append the given line to the list of headers for this file.""" + self.headers.append(line) + + @property + def is_complete(self) -> bool: + """True if the diff is complete, otherwise False.""" + return self.old.is_complete and self.new.is_complete + + +class DiffSide: + """Parsed diff for a single 'side' of a single file.""" + def __init__(self, path: str, new: bool) -> None: + self.path = path + self.new = new + self.prefix = '+' if self.new else '-' + self.eof_newline = True + self.exists = True + + self.lines: list[tuple[int, str]] = [] + self.lines_and_context: list[tuple[int, str]] = [] + self.ranges: list[tuple[int, int]] = [] + + self._next_line_number = 0 + self._lines_remaining = 0 + self._range_start = 0 + + def set_start(self, line_start: int, line_count: int) -> None: + """Set the starting line and line count.""" + self._next_line_number = line_start + self._lines_remaining = line_count + self._range_start = 0 + + def append(self, line: str) -> None: + """Append the given line.""" + if self._lines_remaining <= 0: + raise Exception('Diff range overflow.') + + entry = self._next_line_number, line + + if line.startswith(' '): + pass + elif line.startswith(self.prefix): + self.lines.append(entry) + + if not self._range_start: + self._range_start = self._next_line_number + else: + raise Exception('Unexpected diff content prefix.') + + self.lines_and_context.append(entry) + + self._lines_remaining -= 1 + + if self._range_start: + if self.is_complete: + range_end = self._next_line_number + elif line.startswith(' '): + range_end = self._next_line_number - 1 + else: + range_end = 0 + + if range_end: + self.ranges.append((self._range_start, range_end)) + self._range_start = 0 + + self._next_line_number += 1 + + @property + def is_complete(self) -> bool: + """True if the diff is complete, otherwise False.""" + return self._lines_remaining == 0 + + def format_lines(self, context: bool = True) -> list[str]: + """Format the diff and return a list of lines, optionally including context.""" + if context: + lines = self.lines_and_context + else: + lines = self.lines + + return ['%s:%4d %s' % (self.path, line[0], line[1]) for line in lines] + + +class DiffParser: + """Parse diff lines.""" + def __init__(self, lines: list[str]) -> None: + self.lines = lines + self.files: list[FileDiff] = [] + + self.action = self.process_start + self.line_number = 0 + self.previous_line: t.Optional[str] = None + self.line: t.Optional[str] = None + self.file: t.Optional[FileDiff] = None + + for self.line in self.lines: + self.line_number += 1 + + try: + self.action() + except Exception as ex: + message = textwrap.dedent(''' + %s + + Line: %d + Previous: %s + Current: %s + %s + ''').strip() % ( + ex, + self.line_number, + self.previous_line or '', + self.line or '', + traceback.format_exc(), + ) + + raise ApplicationError(message.strip()) + + self.previous_line = self.line + + self.complete_file() + + def process_start(self) -> None: + """Process a diff start line.""" + self.complete_file() + + match = re.search(r'^diff --git "?(?:a/)?(?P<old_path>.*)"? "?(?:b/)?(?P<new_path>.*)"?$', self.line) + + if not match: + raise Exception('Unexpected diff start line.') + + self.file = FileDiff(match.group('old_path'), match.group('new_path')) + self.action = self.process_continue + + def process_range(self) -> None: + """Process a diff range line.""" + match = re.search(r'^@@ -((?P<old_start>[0-9]+),)?(?P<old_count>[0-9]+) \+((?P<new_start>[0-9]+),)?(?P<new_count>[0-9]+) @@', self.line) + + if not match: + raise Exception('Unexpected diff range line.') + + self.file.old.set_start(int(match.group('old_start') or 1), int(match.group('old_count'))) + self.file.new.set_start(int(match.group('new_start') or 1), int(match.group('new_count'))) + self.action = self.process_content + + def process_continue(self) -> None: + """Process a diff start, range or header line.""" + if self.line.startswith('diff '): + self.process_start() + elif self.line.startswith('@@ '): + self.process_range() + else: + self.process_header() + + def process_header(self) -> None: + """Process a diff header line.""" + if self.line.startswith('Binary files '): + self.file.binary = True + elif self.line == '--- /dev/null': + self.file.old.exists = False + elif self.line == '+++ /dev/null': + self.file.new.exists = False + else: + self.file.append_header(self.line) + + def process_content(self) -> None: + """Process a diff content line.""" + if self.line == r'\ No newline at end of file': + if self.previous_line.startswith(' '): + self.file.old.eof_newline = False + self.file.new.eof_newline = False + elif self.previous_line.startswith('-'): + self.file.old.eof_newline = False + elif self.previous_line.startswith('+'): + self.file.new.eof_newline = False + else: + raise Exception('Unexpected previous diff content line.') + + return + + if self.file.is_complete: + self.process_continue() + return + + if self.line.startswith(' '): + self.file.old.append(self.line) + self.file.new.append(self.line) + elif self.line.startswith('-'): + self.file.old.append(self.line) + elif self.line.startswith('+'): + self.file.new.append(self.line) + else: + raise Exception('Unexpected diff content line.') + + def complete_file(self) -> None: + """Complete processing of the current file, if any.""" + if not self.file: + return + + self.files.append(self.file) |