diff options
Diffstat (limited to 'tests/modules/lib')
-rw-r--r-- | tests/modules/lib/__init__.py | 183 | ||||
-rw-r--r-- | tests/modules/lib/config_mock.py | 230 | ||||
-rw-r--r-- | tests/modules/lib/fsconfig.py | 83 | ||||
-rw-r--r-- | tests/modules/lib/terminal.py | 307 | ||||
-rw-r--r-- | tests/modules/lib/vterm.py | 193 |
5 files changed, 996 insertions, 0 deletions
diff --git a/tests/modules/lib/__init__.py b/tests/modules/lib/__init__.py new file mode 100644 index 0000000..f3e36b8 --- /dev/null +++ b/tests/modules/lib/__init__.py @@ -0,0 +1,183 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import imp +import sys + + +class Pl(object): + def __init__(self): + self.exceptions = [] + self.errors = [] + self.warns = [] + self.debugs = [] + self.infos = [] + self.prefix = None + self.use_daemon_threads = True + + for meth in ('error', 'warn', 'debug', 'exception', 'info'): + exec(( + 'def {0}(self, msg, *args, **kwargs):\n' + ' self.{0}s.append((kwargs.get("prefix") or self.prefix, msg, args, kwargs))\n' + ).format(meth)) + + def __nonzero__(self): + return bool(self.exceptions or self.errors or self.warns) + + __bool__ = __nonzero__ + + +class Args(object): + theme_override = {} + config_override = {} + config_path = None + ext = ['shell'] + renderer_module = None + + def __init__(self, **kwargs): + self.__dict__.update(kwargs) + + +def urllib_read(query_url): + if query_url.startswith('http://ipv'): + if query_url.startswith('http://ipv4.icanhazip.com'): + return '127.0.0.1' + elif query_url.startswith('http://ipv6.icanhazip.com'): + return '2001:4801:7818:6:abc5:ba2c:ff10:275f' + elif query_url.startswith('http://geoip.nekudo.com/api/'): + return '{"city":"Meppen","country":{"name":"Germany", "code":"DE"},"location":{"accuracy_radius":100,"latitude":52.6833,"longitude":7.3167,"time_zone":"Europe\/Berlin"},"ip":"82.145.55.16"}' + elif query_url.startswith('http://query.yahooapis.com/v1/public/'): + if 'Meppen' in query_url: + return r'{"query":{"count":1,"created":"2016-05-13T19:43:18Z","lang":"en-US","results":{"channel":{"units":{"distance":"mi","pressure":"in","speed":"mph","temperature":"C"},"title":"Yahoo! Weather - Meppen, NI, DE","link":"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-674836/","description":"Yahoo! Weather for Meppen, NI, DE","language":"en-us","lastBuildDate":"Fri, 13 May 2016 09:43 PM CEST","ttl":"60","location":{"city":"Meppen","country":"Germany","region":" NI"},"wind":{"chill":"55","direction":"350","speed":"25"},"atmosphere":{"humidity":"57","pressure":"1004.0","rising":"0","visibility":"16.1"},"astronomy":{"sunrise":"5:35 am","sunset":"9:21 pm"},"image":{"title":"Yahoo! Weather","width":"142","height":"18","link":"http://weather.yahoo.com","url":"http://l.yimg.com/a/i/brand/purplelogo//uh/us/news-wea.gif"},"item":{"title":"Conditions for Meppen, NI, DE at 08:00 PM CEST","lat":"52.68993","long":"7.29115","link":"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-674836/","pubDate":"Fri, 13 May 2016 08:00 PM CEST","condition":{"code":"23","date":"Fri, 13 May 2016 08:00 PM CEST","temp":"14","text":"Breezy"},"forecast":[{"code":"30","date":"13 May 2016","day":"Fri","high":"71","low":"48","text":"Partly Cloudy"},{"code":"28","date":"14 May 2016","day":"Sat","high":"54","low":"44","text":"Mostly Cloudy"},{"code":"11","date":"15 May 2016","day":"Sun","high":"55","low":"43","text":"Showers"},{"code":"28","date":"16 May 2016","day":"Mon","high":"54","low":"42","text":"Mostly Cloudy"},{"code":"28","date":"17 May 2016","day":"Tue","high":"57","low":"43","text":"Mostly Cloudy"},{"code":"12","date":"18 May 2016","day":"Wed","high":"62","low":"45","text":"Rain"},{"code":"28","date":"19 May 2016","day":"Thu","high":"63","low":"48","text":"Mostly Cloudy"},{"code":"28","date":"20 May 2016","day":"Fri","high":"67","low":"50","text":"Mostly Cloudy"},{"code":"30","date":"21 May 2016","day":"Sat","high":"71","low":"50","text":"Partly Cloudy"},{"code":"30","date":"22 May 2016","day":"Sun","high":"74","low":"54","text":"Partly Cloudy"}],"description":"<![CDATA[<img src=\"http://l.yimg.com/a/i/us/we/52/23.gif\"/>\n<BR />\n<b>Current Conditions:</b>\n<BR />Breezy\n<BR />\n<BR />\n<b>Forecast:</b>\n<BR /> Fri - Partly Cloudy. High: 71Low: 48\n<BR /> Sat - Mostly Cloudy. High: 54Low: 44\n<BR /> Sun - Showers. High: 55Low: 43\n<BR /> Mon - Mostly Cloudy. High: 54Low: 42\n<BR /> Tue - Mostly Cloudy. High: 57Low: 43\n<BR />\n<BR />\n<a href=\"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-674836/\">Full Forecast at Yahoo! Weather</a>\n<BR />\n<BR />\n(provided by <a href=\"http://www.weather.com\" >The Weather Channel</a>)\n<BR />\n]]>","guid":{"isPermaLink":"false"}}}}}}' + elif 'Moscow' in query_url: + return r'{"query":{"count":1,"created":"2016-05-13T19:47:01Z","lang":"en-US","results":{"channel":{"units":{"distance":"mi","pressure":"in","speed":"mph","temperature":"C"},"title":"Yahoo! Weather - Moscow, Moscow Federal City, RU","link":"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-2122265/","description":"Yahoo! Weather for Moscow, Moscow Federal City, RU","language":"en-us","lastBuildDate":"Fri, 13 May 2016 10:47 PM MSK","ttl":"60","location":{"city":"Moscow","country":"Russia","region":" Moscow Federal City"},"wind":{"chill":"45","direction":"80","speed":"11"},"atmosphere":{"humidity":"52","pressure":"993.0","rising":"0","visibility":"16.1"},"astronomy":{"sunrise":"4:19 am","sunset":"8:34 pm"},"image":{"title":"Yahoo! Weather","width":"142","height":"18","link":"http://weather.yahoo.com","url":"http://l.yimg.com/a/i/brand/purplelogo//uh/us/news-wea.gif"},"item":{"title":"Conditions for Moscow, Moscow Federal City, RU at 09:00 PM MSK","lat":"55.741638","long":"37.605061","link":"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-2122265/","pubDate":"Fri, 13 May 2016 09:00 PM MSK","condition":{"code":"33","date":"Fri, 13 May 2016 09:00 PM MSK","temp":"9","text":"Mostly Clear"},"forecast":[{"code":"30","date":"13 May 2016","day":"Fri","high":"62","low":"41","text":"Partly Cloudy"},{"code":"30","date":"14 May 2016","day":"Sat","high":"64","low":"43","text":"Partly Cloudy"},{"code":"30","date":"15 May 2016","day":"Sun","high":"63","low":"44","text":"Partly Cloudy"},{"code":"12","date":"16 May 2016","day":"Mon","high":"60","low":"47","text":"Rain"},{"code":"12","date":"17 May 2016","day":"Tue","high":"64","low":"48","text":"Rain"},{"code":"28","date":"18 May 2016","day":"Wed","high":"67","low":"48","text":"Mostly Cloudy"},{"code":"12","date":"19 May 2016","day":"Thu","high":"68","low":"49","text":"Rain"},{"code":"39","date":"20 May 2016","day":"Fri","high":"66","low":"50","text":"Scattered Showers"},{"code":"39","date":"21 May 2016","day":"Sat","high":"69","low":"49","text":"Scattered Showers"},{"code":"30","date":"22 May 2016","day":"Sun","high":"73","low":"50","text":"Partly Cloudy"}],"description":"<![CDATA[<img src=\"http://l.yimg.com/a/i/us/we/52/33.gif\"/>\n<BR />\n<b>Current Conditions:</b>\n<BR />Mostly Clear\n<BR />\n<BR />\n<b>Forecast:</b>\n<BR /> Fri - Partly Cloudy. High: 62Low: 41\n<BR /> Sat - Partly Cloudy. High: 64Low: 43\n<BR /> Sun - Partly Cloudy. High: 63Low: 44\n<BR /> Mon - Rain. High: 60Low: 47\n<BR /> Tue - Rain. High: 64Low: 48\n<BR />\n<BR />\n<a href=\"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-2122265/\">Full Forecast at Yahoo! Weather</a>\n<BR />\n<BR />\n(provided by <a href=\"http://www.weather.com\" >The Weather Channel</a>)\n<BR />\n]]>","guid":{"isPermaLink":"false"}}}}}}' + else: + raise NotImplementedError + + +class Process(object): + def __init__(self, output, err): + self.output = output + self.err = err + + def communicate(self): + return self.output, self.err + + +class ModuleReplace(object): + def __init__(self, name, new): + self.name = name + self.new = new + + def __enter__(self): + self.old = sys.modules.get(self.name) + if not self.old: + try: + self.old = __import__(self.name) + except ImportError: + pass + sys.modules[self.name] = self.new + + def __exit__(self, *args): + if self.old: + sys.modules[self.name] = self.old + else: + sys.modules.pop(self.name) + + +def replace_module(name, new=None, **kwargs): + if not new: + new = new_module(name, **kwargs) + return ModuleReplace(name, new) + + +def new_module(name, **kwargs): + module = imp.new_module(name) + for k, v in kwargs.items(): + setattr(module, k, v) + return module + + +class AttrReplace(object): + def __init__(self, obj, *args): + self.obj = obj + self.attrs = args[::2] + self.new = args[1::2] + + def __enter__(self): + self.old = {} + for i, attr in enumerate(self.attrs): + try: + self.old[i] = getattr(self.obj, attr) + except AttributeError: + pass + for attr, new in zip(self.attrs, self.new): + setattr(self.obj, attr, new) + + def __exit__(self, *args): + for i, attr in enumerate(self.attrs): + try: + old = self.old[i] + except KeyError: + delattr(self.obj, attr) + else: + setattr(self.obj, attr, old) + + +replace_attr = AttrReplace + + +def replace_module_module(module, name, **kwargs): + return replace_attr(module, name, new_module(name, **kwargs)) + + +class ItemReplace(object): + def __init__(self, d, key, new, r=None): + self.key = key + self.new = new + self.d = d + self.r = r + + def __enter__(self): + self.old = self.d.get(self.key) + self.d[self.key] = self.new + return self.r + + def __exit__(self, *args): + if self.old is None: + try: + self.d.pop(self.key) + except KeyError: + pass + else: + self.d[self.key] = self.old + + +def replace_item(d, key, new): + return ItemReplace(d, key, new, d) + + +def replace_env(key, new, environ=None, **kwargs): + r = kwargs.copy() + r['environ'] = environ or {} + return ItemReplace(r['environ'], key, new, r) + + +class PowerlineSingleTest(object): + def __init__(self, suite, name): + self.suite = suite + self.name = name + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + if exc_type is not None: + self.exception('Exception while running test: {0!r}'.format( + exc_value)) + + def fail(self, message, allow_failure=False): + return self.suite.fail(self.name, message, allow_failure) + + def exception(self, message, allow_failure=False): + return self.suite.exception(self.name, message, allow_failure) diff --git a/tests/modules/lib/config_mock.py b/tests/modules/lib/config_mock.py new file mode 100644 index 0000000..900b60f --- /dev/null +++ b/tests/modules/lib/config_mock.py @@ -0,0 +1,230 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import os + +from threading import Lock +from copy import deepcopy +from time import sleep +from functools import wraps + +from powerline.renderer import Renderer +from powerline.lib.config import ConfigLoader +from powerline import Powerline, get_default_theme + +from tests.modules.lib import Args, replace_attr + + +UT = get_default_theme(is_unicode=True) +AT = get_default_theme(is_unicode=False) + + +class TestHelpers(object): + def __init__(self, config): + self.config = config + self.access_log = [] + self.access_lock = Lock() + + def loader_condition(self, path): + return (path in self.config) and path + + def find_config_files(self, cfg_path, config_loader, loader_callback): + if cfg_path.endswith('.json'): + cfg_path = cfg_path[:-5] + if cfg_path.startswith('/'): + cfg_path = cfg_path.lstrip('/') + with self.access_lock: + self.access_log.append('check:' + cfg_path) + if cfg_path in self.config: + yield cfg_path + else: + if config_loader: + config_loader.register_missing(self.loader_condition, loader_callback, cfg_path) + raise IOError(('fcf:' if cfg_path.endswith('raise') else '') + cfg_path) + + def load_json_config(self, config_file_path, *args, **kwargs): + if config_file_path.endswith('.json'): + config_file_path = config_file_path[:-5] + if config_file_path.startswith('/'): + config_file_path = config_file_path.lstrip('/') + with self.access_lock: + self.access_log.append('load:' + config_file_path) + try: + return deepcopy(self.config[config_file_path]) + except KeyError: + raise IOError(config_file_path) + + def pop_events(self): + with self.access_lock: + r = self.access_log[:] + self.access_log = [] + return r + + +def log_call(func): + @wraps(func) + def ret(self, *args, **kwargs): + self._calls.append((func.__name__, args, kwargs)) + return func(self, *args, **kwargs) + return ret + + +class TestWatcher(object): + events = set() + lock = Lock() + + def __init__(self): + self._calls = [] + + @log_call + def watch(self, file): + pass + + @log_call + def __call__(self, file): + with self.lock: + if file in self.events: + self.events.remove(file) + return True + return False + + def _reset(self, files): + with self.lock: + self.events.clear() + self.events.update(files) + + @log_call + def unsubscribe(self): + pass + + +class Logger(object): + def __init__(self): + self.messages = [] + self.lock = Lock() + + def _add_msg(self, attr, msg): + with self.lock: + self.messages.append(attr + ':' + msg) + + def _pop_msgs(self): + with self.lock: + r = self.messages + self.messages = [] + return r + + def __getattr__(self, attr): + return lambda *args, **kwargs: self._add_msg(attr, *args, **kwargs) + + +class SimpleRenderer(Renderer): + def hlstyle(self, fg=None, bg=None, attrs=None): + return '<{fg} {bg} {attrs}>'.format(fg=fg and fg[0], bg=bg and bg[0], attrs=attrs) + + +class EvenSimplerRenderer(Renderer): + def hlstyle(self, fg=None, bg=None, attrs=None): + return '{{{fg}{bg}{attrs}}}'.format( + fg=fg and fg[0] or '-', + bg=bg and bg[0] or '-', + attrs=attrs if attrs else '', + ) + + +class TestPowerline(Powerline): + _created = False + + def __init__(self, _helpers, **kwargs): + super(TestPowerline, self).__init__(**kwargs) + self._helpers = _helpers + self.find_config_files = _helpers.find_config_files + + @staticmethod + def get_local_themes(local_themes): + return local_themes + + @staticmethod + def get_config_paths(): + return [''] + + def _will_create_renderer(self): + return self.cr_kwargs + + def _pop_events(self): + return self._helpers.pop_events() + + +renderer = EvenSimplerRenderer + + +class TestConfigLoader(ConfigLoader): + def __init__(self, _helpers, **kwargs): + watcher = TestWatcher() + super(TestConfigLoader, self).__init__( + load=_helpers.load_json_config, + watcher=watcher, + watcher_type='test', + **kwargs + ) + + +def get_powerline(config, **kwargs): + helpers = TestHelpers(config) + return get_powerline_raw( + helpers, + TestPowerline, + _helpers=helpers, + ext='test', + renderer_module='tests.modules.lib.config_mock', + logger=Logger(), + **kwargs + ) + + +def select_renderer(simpler_renderer=False): + global renderer + renderer = EvenSimplerRenderer if simpler_renderer else SimpleRenderer + + +def get_powerline_raw(helpers, PowerlineClass, replace_gcp=False, **kwargs): + if not isinstance(helpers, TestHelpers): + helpers = TestHelpers(helpers) + select_renderer(kwargs.pop('simpler_renderer', False)) + + if replace_gcp: + class PowerlineClass(PowerlineClass): + @staticmethod + def get_config_paths(): + return ['/'] + + pl = PowerlineClass( + config_loader=TestConfigLoader( + _helpers=helpers, + run_once=kwargs.get('run_once') + ), + **kwargs + ) + pl._watcher = pl.config_loader.watcher + return pl + + +def swap_attributes(config, powerline_module): + return replace_attr(powerline_module, 'os', Args( + path=Args( + isfile=lambda path: path.lstrip('/').replace('.json', '') in config, + join=os.path.join, + expanduser=lambda path: path, + realpath=lambda path: path, + dirname=os.path.dirname, + ), + environ={}, + )) + + +def add_watcher_events(p, *args, **kwargs): + if isinstance(p._watcher, TestWatcher): + p._watcher._reset(args) + while not p._will_create_renderer(): + sleep(kwargs.get('interval', 0.1)) + if not kwargs.get('wait', True): + return diff --git a/tests/modules/lib/fsconfig.py b/tests/modules/lib/fsconfig.py new file mode 100644 index 0000000..757e874 --- /dev/null +++ b/tests/modules/lib/fsconfig.py @@ -0,0 +1,83 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import os +import json + +from subprocess import check_call +from shutil import rmtree +from itertools import chain + +from powerline import Powerline + + +CONFIG_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'config') + + +class TestPowerline(Powerline): + def __init__(self, _paths, *args, **kwargs): + super(TestPowerline, self).__init__(*args, **kwargs) + self._paths = _paths + + def get_config_paths(self): + return self._paths + + +def mkdir_recursive(directory): + if os.path.isdir(directory): + return + mkdir_recursive(os.path.dirname(directory)) + os.mkdir(directory) + + +class FSTree(object): + __slots__ = ('tree', 'p', 'p_kwargs', 'create_p', 'get_config_paths', 'root') + + def __init__( + self, + tree, + p_kwargs={'run_once': True}, + root=CONFIG_DIR, + get_config_paths=lambda p: (p,), + create_p=False + ): + self.tree = tree + self.root = root + self.get_config_paths = get_config_paths + self.create_p = create_p + self.p = None + self.p_kwargs = p_kwargs + + def __enter__(self, *args): + os.mkdir(self.root) + for k, v in self.tree.items(): + fname = os.path.join(self.root, k) + '.json' + mkdir_recursive(os.path.dirname(fname)) + with open(fname, 'w') as F: + json.dump(v, F) + if self.create_p: + self.p = TestPowerline( + _paths=self.get_config_paths(self.root), + ext='test', + renderer_module='tests.modules.lib.config_mock', + **self.p_kwargs + ) + if os.environ.get('POWERLINE_RUN_LINT_DURING_TESTS'): + try: + check_call(chain(['scripts/powerline-lint'], *[ + ('-p', d) for d in ( + self.p.get_config_paths() if self.p + else self.get_config_paths(self.root) + ) + ])) + except: + self.__exit__() + raise + return self.p and self.p.__enter__(*args) + + def __exit__(self, *args): + try: + rmtree(self.root) + finally: + if self.p: + self.p.__exit__(*args) diff --git a/tests/modules/lib/terminal.py b/tests/modules/lib/terminal.py new file mode 100644 index 0000000..540135d --- /dev/null +++ b/tests/modules/lib/terminal.py @@ -0,0 +1,307 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import threading +import os + +from time import sleep +from itertools import groupby +from signal import SIGKILL +from difflib import ndiff + +import pexpect + +from powerline.lib.unicode import u + +from tests.modules.lib.vterm import VTerm, Dimensions + + +class MutableDimensions(object): + def __init__(self, rows, cols): + super(MutableDimensions, self).__init__() + self._list = [rows, cols] + + def __getitem__(self, idx): + return self._list[idx] + + def __setitem__(self, idx, val): + self._list[idx] = val + + def __iter__(self): + return iter(self._list) + + def __len__(self): + return 2 + + def __nonzero__(self): + return True + + __bool__ = __nonzero__ + + rows = property( + fget = lambda self: self._list[0], + fset = lambda self, val: self._list.__setitem__(0, val), + ) + cols = property( + fget = lambda self: self._list[1], + fset = lambda self, val: self._list.__setitem__(1, val), + ) + + +class ExpectProcess(threading.Thread): + def __init__(self, lib, dim, cmd, args, cwd=None, env=None): + super(ExpectProcess, self).__init__() + self.vterm = VTerm(lib, dim) + self.lock = threading.Lock() + self.dim = Dimensions(*dim) + self.cmd = cmd + self.args = args + self.cwd = cwd + self.env = env + self.buffer = [] + self.child_lock = threading.Lock() + self.shutdown_event = threading.Event() + self.started_event = threading.Event() + + def run(self): + with self.child_lock: + child = pexpect.spawn(self.cmd, self.args, cwd=self.cwd, + env=self.env) + sleep(0.5) + child.setwinsize(self.dim.rows, self.dim.cols) + sleep(0.5) + self.child = child + self.started_event.set() + status = None + while status is None and not self.shutdown_event.is_set(): + try: + with self.child_lock: + s = child.read_nonblocking(size=1024, timeout=0) + status = child.status + except pexpect.TIMEOUT: + pass + except pexpect.EOF: + break + else: + with self.lock: + self.vterm.push(s) + self.buffer.append(s) + + if status is None: + child.kill(SIGKILL) + + def kill(self): + self.shutdown_event.set() + + def resize(self, dim): + with self.child_lock: + self.dim = Dimensions(*dim) + self.child.setwinsize(self.dim.rows, self.dim.cols) + self.vterm.resize(self.dim) + + def __getitem__(self, position): + with self.lock: + return self.vterm.vtscreen[position] + + def read(self): + with self.lock: + ret = b''.join(self.buffer) + del self.buffer[:] + return ret + + def send(self, data): + with self.child_lock: + self.child.send(data) + + def get_highlighted_text(self, text, attrs, default_props=(), + use_escapes=False): + ret = [] + new_attrs = attrs.copy() + for cell_properties, segment_text in text: + if use_escapes: + escapes = ('\033[38;2;{0};{1};{2};48;2;{3};{4};{5}'.format( + *(cell_properties[0] + cell_properties[1]))) + ( + ';1' if cell_properties[2] else '' + ) + ( + ';3' if cell_properties[3] else '' + ) + ( + ';4' if cell_properties[4] else '' + ) + 'm' + ret.append(escapes + segment_text + '\033[0m') + else: + segment_text = segment_text.translate({'{': '{{', '}': '}}'}) + if cell_properties not in new_attrs: + new_attrs[cell_properties] = len(new_attrs) + 1 + props_name = new_attrs[cell_properties] + if props_name in default_props: + ret.append(segment_text) + else: + ret.append('{' + str(props_name) + ':' + segment_text + '}') + return ''.join(ret), new_attrs + + def get_row(self, row, attrs, default_props=(), use_escapes=False): + with self.lock: + return self.get_highlighted_text(( + (key, ''.join((cell.text for cell in subline))) + for key, subline in groupby(( + self.vterm.vtscreen[row, col] + for col in range(self.dim.cols) + ), lambda cell: cell.cell_properties_key) + ), attrs, default_props, use_escapes) + + def get_screen(self, attrs, default_props=(), use_escapes=False): + lines = [] + for row in range(self.dim.rows): + line, attrs = self.get_row(row, attrs, default_props, use_escapes) + lines.append(line) + return '\n'.join(lines), attrs + + +def test_expected_result(p, test, last_attempt, last_attempt_cb, attempts): + debugging_tests = not not os.environ.get('_POWERLINE_DEBUGGING_TESTS') + expected_text, attrs = test['expected_result'] + result = None + while attempts: + if 'row' in test: + row = test['row'] + else: + row = p.dim.rows - 1 + while row >= 0 and not p[row, 0].text: + row -= 1 + if row < 0: + row = 0 + actual_text, all_attrs = p.get_row(row, attrs) + if actual_text == expected_text: + return True + attempts -= 1 + print('Actual result does not match expected for row {0}. Attempts left: {1}.'.format( + row, attempts)) + sleep(2) + print('Result (row {0}):'.format(row)) + print(actual_text) + print('Expected:') + print(expected_text) + print('Attributes:') + for v, k in sorted( + ((v, k) for k, v in all_attrs.items()), + key=(lambda t: '%02u'.format(t[0]) if isinstance(t[0], int) else t[0]), + ): + print('{k!r}: {v!r},'.format(v=v, k=k)) + print('Screen:') + screen, screen_attrs = p.get_screen(attrs, use_escapes=debugging_tests) + print(screen) + print(screen_attrs) + print('_' * 80) + print('Diff:') + print('=' * 80) + print(''.join(( + u(line) for line in ndiff([actual_text + '\n'], [expected_text + '\n'])) + )) + if last_attempt and last_attempt_cb: + last_attempt_cb() + return False + + +ENV_BASE = { + # Reasoning: + # 1. vt* TERMs (used to be vt100 here) make tmux-1.9 use different and + # identical colors for inactive windows. This is not like tmux-1.6: + # foreground color is different from separator color and equal to (0, + # 102, 153) for some reason (separator has correct color). tmux-1.8 is + # fine, so are older versions (though tmux-1.6 and tmux-1.7 do not have + # highlighting for previously active window) and my system tmux-1.9a. + # 2. screen, xterm and some other non-256color terminals both have the same + # issue and make libvterm emit complains like `Unhandled CSI SGR 3231`. + # 3. screen-256color, xterm-256color and other -256color terminals make + # libvterm emit complains about unhandled escapes to stderr. + # 4. `st-256color` does not have any of the above problems, but it may be + # not present on the target system because it is installed with + # x11-terms/st and not with sys-libs/ncurses. + # + # For the given reasons decision was made: to fix tmux-1.9 tests and not + # make libvterm emit any data to stderr st-256color $TERM should be used, up + # until libvterm has its own terminfo database entry (if it ever will). To + # make sure that relevant terminfo entry is present on the target system it + # should be distributed with powerline test package. To make distribution + # not require modifying anything outside of powerline test directory + # TERMINFO variable is set. + # + # This fix propagates to non-tmux vterm tests just in case. + 'TERM': 'st-256color', + # Also $TERMINFO definition in get_env + + 'POWERLINE_CONFIG_PATHS': os.path.abspath('powerline/config_files'), + 'POWERLINE_COMMAND': 'powerline-render', + 'LD_LIBRARY_PATH': os.environ.get('LD_LIBRARY_PATH', ''), + 'PYTHONPATH': os.environ.get('PYTHONPATH', ''), +} + + +def get_env(vterm_path, test_dir, *args, **kwargs): + env = ENV_BASE.copy() + env.update({ + 'TERMINFO': os.path.join(test_dir, 'terminfo'), + 'PATH': vterm_path, + 'SHELL': os.path.join(vterm_path, 'bash'), + }) + env.update(*args, **kwargs) + return env + + +def do_terminal_tests(tests, cmd, dim, args, env, suite, cwd=None, fin_cb=None, + last_attempt_cb=None, attempts=None): + debugging_tests = not not os.environ.get('_POWERLINE_DEBUGGING_TESTS') + default_attempts = 2 if debugging_tests else 3 + if attempts is None: + attempts = default_attempts + lib = os.environ.get('POWERLINE_LIBVTERM') + if not lib: + if os.path.exists('tests/bot-ci/deps/libvterm/libvterm.so'): + lib = 'tests/bot-ci/deps/libvterm/libvterm.so' + else: + lib = 'libvterm.so' + + while attempts: + try: + p = ExpectProcess( + lib=lib, + dim=dim, + cmd=cmd, + args=args, + cwd=cwd, + env=env, + ) + p.start() + p.started_event.wait() + + ret = True + + for i, test in enumerate(tests): + with suite.test(test.get('name', 'test_{0}'.format(i)), + attempts - 1) as ptest: + try: + test_prep = test['prep_cb'] + except KeyError: + pass + else: + test_prep(p) + test_result = test_expected_result( + p, test, attempts == 0, last_attempt_cb, + test.get('attempts', default_attempts) + ) + if not test_result: + ptest.fail('Result does not match expected') + ret = ret and test_result + + if ret: + return ret + finally: + if fin_cb: + fin_cb(p=p, cmd=cmd, env=env) + p.kill() + p.join(10) + assert(not p.isAlive()) + + attempts -= 1 + + return False diff --git a/tests/modules/lib/vterm.py b/tests/modules/lib/vterm.py new file mode 100644 index 0000000..1984e1b --- /dev/null +++ b/tests/modules/lib/vterm.py @@ -0,0 +1,193 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import ctypes + +from collections import namedtuple + +from powerline.lib.unicode import unicode, unichr, tointiter + + +Dimensions = namedtuple('Dimensions', ('rows', 'cols')) + + +class CTypesFunction(object): + def __init__(self, library, name, rettype, args): + self.name = name + self.prototype = ctypes.CFUNCTYPE(rettype, *[ + arg[1] for arg in args + ]) + self.args = args + self.func = self.prototype((name, library), tuple(( + (1, arg[0]) for arg in args + ))) + + def __call__(self, *args, **kwargs): + return self.func(*args, **kwargs) + + def __repr__(self): + return '{cls}(<library>, {name!r}, {rettype!r}, {args!r})'.format( + cls=self.__class__.__name__, + **self.__dict__ + ) + + +class CTypesLibraryFuncsCollection(object): + def __init__(self, lib, **kwargs): + self.lib = lib + library_loader = ctypes.LibraryLoader(ctypes.CDLL) + library = library_loader.LoadLibrary(lib) + self.library = library + for name, args in kwargs.items(): + self.__dict__[name] = CTypesFunction(library, name, *args) + + +class VTermPos_s(ctypes.Structure): + _fields_ = ( + ('row', ctypes.c_int), + ('col', ctypes.c_int), + ) + + +class VTermColor_s(ctypes.Structure): + _fields_ = ( + ('red', ctypes.c_uint8), + ('green', ctypes.c_uint8), + ('blue', ctypes.c_uint8), + ) + + +class VTermScreenCellAttrs_s(ctypes.Structure): + _fields_ = ( + ('bold', ctypes.c_uint, 1), + ('underline', ctypes.c_uint, 2), + ('italic', ctypes.c_uint, 1), + ('blink', ctypes.c_uint, 1), + ('reverse', ctypes.c_uint, 1), + ('strike', ctypes.c_uint, 1), + ('font', ctypes.c_uint, 4), + ('dwl', ctypes.c_uint, 1), + ('dhl', ctypes.c_uint, 2), + ) + + +VTERM_MAX_CHARS_PER_CELL = 6 + + +class VTermScreenCell_s(ctypes.Structure): + _fields_ = ( + ('chars', ctypes.ARRAY(ctypes.c_uint32, VTERM_MAX_CHARS_PER_CELL)), + ('width', ctypes.c_char), + ('attrs', VTermScreenCellAttrs_s), + ('fg', VTermColor_s), + ('bg', VTermColor_s), + ) + + +VTerm_p = ctypes.c_void_p +VTermScreen_p = ctypes.c_void_p + + +def get_functions(lib): + return CTypesLibraryFuncsCollection( + lib, + vterm_new=(VTerm_p, ( + ('rows', ctypes.c_int), + ('cols', ctypes.c_int) + )), + vterm_obtain_screen=(VTermScreen_p, (('vt', VTerm_p),)), + vterm_set_size=(None, ( + ('vt', VTerm_p), + ('rows', ctypes.c_int), + ('cols', ctypes.c_int) + )), + vterm_screen_reset=(None, ( + ('screen', VTermScreen_p), + ('hard', ctypes.c_int) + )), + vterm_input_write=(ctypes.c_size_t, ( + ('vt', VTerm_p), + ('bytes', ctypes.POINTER(ctypes.c_char)), + ('size', ctypes.c_size_t), + )), + vterm_screen_get_cell=(ctypes.c_int, ( + ('screen', VTermScreen_p), + ('pos', VTermPos_s), + ('cell', ctypes.POINTER(VTermScreenCell_s)) + )), + vterm_free=(None, (('vt', VTerm_p),)), + vterm_set_utf8=(None, (('vt', VTerm_p), ('is_utf8', ctypes.c_int))), + ) + + +class VTermColor(object): + __slots__ = ('red', 'green', 'blue') + + def __init__(self, color): + self.red = color.red + self.green = color.green + self.blue = color.blue + + @property + def color_key(self): + return (self.red, self.green, self.blue) + + +class VTermScreenCell(object): + def __init__(self, vtsc): + for field in VTermScreenCellAttrs_s._fields_: + field_name = field[0] + setattr(self, field_name, getattr(vtsc.attrs, field_name)) + self.text = ''.join(( + unichr(vtsc.chars[i]) for i in range(VTERM_MAX_CHARS_PER_CELL) + )).rstrip('\x00') + self.width = next(tointiter(vtsc.width)) + self.fg = VTermColor(vtsc.fg) + self.bg = VTermColor(vtsc.bg) + self.cell_properties_key = ( + self.fg.color_key, + self.bg.color_key, + self.bold, + self.underline, + self.italic, + ) + + +class VTermScreen(object): + def __init__(self, functions, screen): + self.functions = functions + self.screen = screen + + def __getitem__(self, position): + pos = VTermPos_s(*position) + cell = VTermScreenCell_s() + ret = self.functions.vterm_screen_get_cell(self.screen, pos, cell) + if ret != 1: + raise ValueError('vterm_screen_get_cell returned {0}'.format(ret)) + return VTermScreenCell(cell) + + def reset(self, hard): + self.functions.vterm_screen_reset(self.screen, int(bool(hard))) + + +class VTerm(object): + def __init__(self, lib, dim): + self.functions = get_functions(lib) + self.vt = self.functions.vterm_new(dim.rows, dim.cols) + self.functions.vterm_set_utf8(self.vt, 1) + self.vtscreen = VTermScreen(self.functions, self.functions.vterm_obtain_screen(self.vt)) + self.vtscreen.reset(True) + + def push(self, data): + if isinstance(data, unicode): + data = data.encode('utf-8') + return self.functions.vterm_input_write(self.vt, data, len(data)) + + def resize(self, dim): + self.functions.vterm_set_size(self.vt, dim.rows, dim.cols) + + def __del__(self): + try: + self.functions.vterm_free(self.vt) + except AttributeError: + pass |