summaryrefslogtreecommitdiffstats
path: root/tests/modules/lib/terminal.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-06 02:04:06 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-06 02:04:06 +0000
commita8637fe80c24fb04bf6cc8ae8459877535f1341b (patch)
tree5d263b4543e10940f5e9a79a8fe981c5a3414bd7 /tests/modules/lib/terminal.py
parentInitial commit. (diff)
downloadpowerline-a8637fe80c24fb04bf6cc8ae8459877535f1341b.tar.xz
powerline-a8637fe80c24fb04bf6cc8ae8459877535f1341b.zip
Adding upstream version 2.7.upstream/2.7upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tests/modules/lib/terminal.py')
-rw-r--r--tests/modules/lib/terminal.py307
1 files changed, 307 insertions, 0 deletions
diff --git a/tests/modules/lib/terminal.py b/tests/modules/lib/terminal.py
new file mode 100644
index 0000000..540135d
--- /dev/null
+++ b/tests/modules/lib/terminal.py
@@ -0,0 +1,307 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import threading
+import os
+
+from time import sleep
+from itertools import groupby
+from signal import SIGKILL
+from difflib import ndiff
+
+import pexpect
+
+from powerline.lib.unicode import u
+
+from tests.modules.lib.vterm import VTerm, Dimensions
+
+
+class MutableDimensions(object):
+ def __init__(self, rows, cols):
+ super(MutableDimensions, self).__init__()
+ self._list = [rows, cols]
+
+ def __getitem__(self, idx):
+ return self._list[idx]
+
+ def __setitem__(self, idx, val):
+ self._list[idx] = val
+
+ def __iter__(self):
+ return iter(self._list)
+
+ def __len__(self):
+ return 2
+
+ def __nonzero__(self):
+ return True
+
+ __bool__ = __nonzero__
+
+ rows = property(
+ fget = lambda self: self._list[0],
+ fset = lambda self, val: self._list.__setitem__(0, val),
+ )
+ cols = property(
+ fget = lambda self: self._list[1],
+ fset = lambda self, val: self._list.__setitem__(1, val),
+ )
+
+
+class ExpectProcess(threading.Thread):
+ def __init__(self, lib, dim, cmd, args, cwd=None, env=None):
+ super(ExpectProcess, self).__init__()
+ self.vterm = VTerm(lib, dim)
+ self.lock = threading.Lock()
+ self.dim = Dimensions(*dim)
+ self.cmd = cmd
+ self.args = args
+ self.cwd = cwd
+ self.env = env
+ self.buffer = []
+ self.child_lock = threading.Lock()
+ self.shutdown_event = threading.Event()
+ self.started_event = threading.Event()
+
+ def run(self):
+ with self.child_lock:
+ child = pexpect.spawn(self.cmd, self.args, cwd=self.cwd,
+ env=self.env)
+ sleep(0.5)
+ child.setwinsize(self.dim.rows, self.dim.cols)
+ sleep(0.5)
+ self.child = child
+ self.started_event.set()
+ status = None
+ while status is None and not self.shutdown_event.is_set():
+ try:
+ with self.child_lock:
+ s = child.read_nonblocking(size=1024, timeout=0)
+ status = child.status
+ except pexpect.TIMEOUT:
+ pass
+ except pexpect.EOF:
+ break
+ else:
+ with self.lock:
+ self.vterm.push(s)
+ self.buffer.append(s)
+
+ if status is None:
+ child.kill(SIGKILL)
+
+ def kill(self):
+ self.shutdown_event.set()
+
+ def resize(self, dim):
+ with self.child_lock:
+ self.dim = Dimensions(*dim)
+ self.child.setwinsize(self.dim.rows, self.dim.cols)
+ self.vterm.resize(self.dim)
+
+ def __getitem__(self, position):
+ with self.lock:
+ return self.vterm.vtscreen[position]
+
+ def read(self):
+ with self.lock:
+ ret = b''.join(self.buffer)
+ del self.buffer[:]
+ return ret
+
+ def send(self, data):
+ with self.child_lock:
+ self.child.send(data)
+
+ def get_highlighted_text(self, text, attrs, default_props=(),
+ use_escapes=False):
+ ret = []
+ new_attrs = attrs.copy()
+ for cell_properties, segment_text in text:
+ if use_escapes:
+ escapes = ('\033[38;2;{0};{1};{2};48;2;{3};{4};{5}'.format(
+ *(cell_properties[0] + cell_properties[1]))) + (
+ ';1' if cell_properties[2] else ''
+ ) + (
+ ';3' if cell_properties[3] else ''
+ ) + (
+ ';4' if cell_properties[4] else ''
+ ) + 'm'
+ ret.append(escapes + segment_text + '\033[0m')
+ else:
+ segment_text = segment_text.translate({'{': '{{', '}': '}}'})
+ if cell_properties not in new_attrs:
+ new_attrs[cell_properties] = len(new_attrs) + 1
+ props_name = new_attrs[cell_properties]
+ if props_name in default_props:
+ ret.append(segment_text)
+ else:
+ ret.append('{' + str(props_name) + ':' + segment_text + '}')
+ return ''.join(ret), new_attrs
+
+ def get_row(self, row, attrs, default_props=(), use_escapes=False):
+ with self.lock:
+ return self.get_highlighted_text((
+ (key, ''.join((cell.text for cell in subline)))
+ for key, subline in groupby((
+ self.vterm.vtscreen[row, col]
+ for col in range(self.dim.cols)
+ ), lambda cell: cell.cell_properties_key)
+ ), attrs, default_props, use_escapes)
+
+ def get_screen(self, attrs, default_props=(), use_escapes=False):
+ lines = []
+ for row in range(self.dim.rows):
+ line, attrs = self.get_row(row, attrs, default_props, use_escapes)
+ lines.append(line)
+ return '\n'.join(lines), attrs
+
+
+def test_expected_result(p, test, last_attempt, last_attempt_cb, attempts):
+ debugging_tests = not not os.environ.get('_POWERLINE_DEBUGGING_TESTS')
+ expected_text, attrs = test['expected_result']
+ result = None
+ while attempts:
+ if 'row' in test:
+ row = test['row']
+ else:
+ row = p.dim.rows - 1
+ while row >= 0 and not p[row, 0].text:
+ row -= 1
+ if row < 0:
+ row = 0
+ actual_text, all_attrs = p.get_row(row, attrs)
+ if actual_text == expected_text:
+ return True
+ attempts -= 1
+ print('Actual result does not match expected for row {0}. Attempts left: {1}.'.format(
+ row, attempts))
+ sleep(2)
+ print('Result (row {0}):'.format(row))
+ print(actual_text)
+ print('Expected:')
+ print(expected_text)
+ print('Attributes:')
+ for v, k in sorted(
+ ((v, k) for k, v in all_attrs.items()),
+ key=(lambda t: '%02u'.format(t[0]) if isinstance(t[0], int) else t[0]),
+ ):
+ print('{k!r}: {v!r},'.format(v=v, k=k))
+ print('Screen:')
+ screen, screen_attrs = p.get_screen(attrs, use_escapes=debugging_tests)
+ print(screen)
+ print(screen_attrs)
+ print('_' * 80)
+ print('Diff:')
+ print('=' * 80)
+ print(''.join((
+ u(line) for line in ndiff([actual_text + '\n'], [expected_text + '\n']))
+ ))
+ if last_attempt and last_attempt_cb:
+ last_attempt_cb()
+ return False
+
+
+ENV_BASE = {
+ # Reasoning:
+ # 1. vt* TERMs (used to be vt100 here) make tmux-1.9 use different and
+ # identical colors for inactive windows. This is not like tmux-1.6:
+ # foreground color is different from separator color and equal to (0,
+ # 102, 153) for some reason (separator has correct color). tmux-1.8 is
+ # fine, so are older versions (though tmux-1.6 and tmux-1.7 do not have
+ # highlighting for previously active window) and my system tmux-1.9a.
+ # 2. screen, xterm and some other non-256color terminals both have the same
+ # issue and make libvterm emit complains like `Unhandled CSI SGR 3231`.
+ # 3. screen-256color, xterm-256color and other -256color terminals make
+ # libvterm emit complains about unhandled escapes to stderr.
+ # 4. `st-256color` does not have any of the above problems, but it may be
+ # not present on the target system because it is installed with
+ # x11-terms/st and not with sys-libs/ncurses.
+ #
+ # For the given reasons decision was made: to fix tmux-1.9 tests and not
+ # make libvterm emit any data to stderr st-256color $TERM should be used, up
+ # until libvterm has its own terminfo database entry (if it ever will). To
+ # make sure that relevant terminfo entry is present on the target system it
+ # should be distributed with powerline test package. To make distribution
+ # not require modifying anything outside of powerline test directory
+ # TERMINFO variable is set.
+ #
+ # This fix propagates to non-tmux vterm tests just in case.
+ 'TERM': 'st-256color',
+ # Also $TERMINFO definition in get_env
+
+ 'POWERLINE_CONFIG_PATHS': os.path.abspath('powerline/config_files'),
+ 'POWERLINE_COMMAND': 'powerline-render',
+ 'LD_LIBRARY_PATH': os.environ.get('LD_LIBRARY_PATH', ''),
+ 'PYTHONPATH': os.environ.get('PYTHONPATH', ''),
+}
+
+
+def get_env(vterm_path, test_dir, *args, **kwargs):
+ env = ENV_BASE.copy()
+ env.update({
+ 'TERMINFO': os.path.join(test_dir, 'terminfo'),
+ 'PATH': vterm_path,
+ 'SHELL': os.path.join(vterm_path, 'bash'),
+ })
+ env.update(*args, **kwargs)
+ return env
+
+
+def do_terminal_tests(tests, cmd, dim, args, env, suite, cwd=None, fin_cb=None,
+ last_attempt_cb=None, attempts=None):
+ debugging_tests = not not os.environ.get('_POWERLINE_DEBUGGING_TESTS')
+ default_attempts = 2 if debugging_tests else 3
+ if attempts is None:
+ attempts = default_attempts
+ lib = os.environ.get('POWERLINE_LIBVTERM')
+ if not lib:
+ if os.path.exists('tests/bot-ci/deps/libvterm/libvterm.so'):
+ lib = 'tests/bot-ci/deps/libvterm/libvterm.so'
+ else:
+ lib = 'libvterm.so'
+
+ while attempts:
+ try:
+ p = ExpectProcess(
+ lib=lib,
+ dim=dim,
+ cmd=cmd,
+ args=args,
+ cwd=cwd,
+ env=env,
+ )
+ p.start()
+ p.started_event.wait()
+
+ ret = True
+
+ for i, test in enumerate(tests):
+ with suite.test(test.get('name', 'test_{0}'.format(i)),
+ attempts - 1) as ptest:
+ try:
+ test_prep = test['prep_cb']
+ except KeyError:
+ pass
+ else:
+ test_prep(p)
+ test_result = test_expected_result(
+ p, test, attempts == 0, last_attempt_cb,
+ test.get('attempts', default_attempts)
+ )
+ if not test_result:
+ ptest.fail('Result does not match expected')
+ ret = ret and test_result
+
+ if ret:
+ return ret
+ finally:
+ if fin_cb:
+ fin_cb(p=p, cmd=cmd, env=env)
+ p.kill()
+ p.join(10)
+ assert(not p.isAlive())
+
+ attempts -= 1
+
+ return False