summaryrefslogtreecommitdiffstats
path: root/tests/modules/lib
diff options
context:
space:
mode:
Diffstat (limited to 'tests/modules/lib')
-rw-r--r--tests/modules/lib/__init__.py183
-rw-r--r--tests/modules/lib/config_mock.py230
-rw-r--r--tests/modules/lib/fsconfig.py83
-rw-r--r--tests/modules/lib/terminal.py307
-rw-r--r--tests/modules/lib/vterm.py193
5 files changed, 996 insertions, 0 deletions
diff --git a/tests/modules/lib/__init__.py b/tests/modules/lib/__init__.py
new file mode 100644
index 0000000..f3e36b8
--- /dev/null
+++ b/tests/modules/lib/__init__.py
@@ -0,0 +1,183 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import imp
+import sys
+
+
+class Pl(object):
+ def __init__(self):
+ self.exceptions = []
+ self.errors = []
+ self.warns = []
+ self.debugs = []
+ self.infos = []
+ self.prefix = None
+ self.use_daemon_threads = True
+
+ for meth in ('error', 'warn', 'debug', 'exception', 'info'):
+ exec((
+ 'def {0}(self, msg, *args, **kwargs):\n'
+ ' self.{0}s.append((kwargs.get("prefix") or self.prefix, msg, args, kwargs))\n'
+ ).format(meth))
+
+ def __nonzero__(self):
+ return bool(self.exceptions or self.errors or self.warns)
+
+ __bool__ = __nonzero__
+
+
+class Args(object):
+ theme_override = {}
+ config_override = {}
+ config_path = None
+ ext = ['shell']
+ renderer_module = None
+
+ def __init__(self, **kwargs):
+ self.__dict__.update(kwargs)
+
+
+def urllib_read(query_url):
+ if query_url.startswith('http://ipv'):
+ if query_url.startswith('http://ipv4.icanhazip.com'):
+ return '127.0.0.1'
+ elif query_url.startswith('http://ipv6.icanhazip.com'):
+ return '2001:4801:7818:6:abc5:ba2c:ff10:275f'
+ elif query_url.startswith('http://geoip.nekudo.com/api/'):
+ return '{"city":"Meppen","country":{"name":"Germany", "code":"DE"},"location":{"accuracy_radius":100,"latitude":52.6833,"longitude":7.3167,"time_zone":"Europe\/Berlin"},"ip":"82.145.55.16"}'
+ elif query_url.startswith('http://query.yahooapis.com/v1/public/'):
+ if 'Meppen' in query_url:
+ return r'{"query":{"count":1,"created":"2016-05-13T19:43:18Z","lang":"en-US","results":{"channel":{"units":{"distance":"mi","pressure":"in","speed":"mph","temperature":"C"},"title":"Yahoo! Weather - Meppen, NI, DE","link":"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-674836/","description":"Yahoo! Weather for Meppen, NI, DE","language":"en-us","lastBuildDate":"Fri, 13 May 2016 09:43 PM CEST","ttl":"60","location":{"city":"Meppen","country":"Germany","region":" NI"},"wind":{"chill":"55","direction":"350","speed":"25"},"atmosphere":{"humidity":"57","pressure":"1004.0","rising":"0","visibility":"16.1"},"astronomy":{"sunrise":"5:35 am","sunset":"9:21 pm"},"image":{"title":"Yahoo! Weather","width":"142","height":"18","link":"http://weather.yahoo.com","url":"http://l.yimg.com/a/i/brand/purplelogo//uh/us/news-wea.gif"},"item":{"title":"Conditions for Meppen, NI, DE at 08:00 PM CEST","lat":"52.68993","long":"7.29115","link":"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-674836/","pubDate":"Fri, 13 May 2016 08:00 PM CEST","condition":{"code":"23","date":"Fri, 13 May 2016 08:00 PM CEST","temp":"14","text":"Breezy"},"forecast":[{"code":"30","date":"13 May 2016","day":"Fri","high":"71","low":"48","text":"Partly Cloudy"},{"code":"28","date":"14 May 2016","day":"Sat","high":"54","low":"44","text":"Mostly Cloudy"},{"code":"11","date":"15 May 2016","day":"Sun","high":"55","low":"43","text":"Showers"},{"code":"28","date":"16 May 2016","day":"Mon","high":"54","low":"42","text":"Mostly Cloudy"},{"code":"28","date":"17 May 2016","day":"Tue","high":"57","low":"43","text":"Mostly Cloudy"},{"code":"12","date":"18 May 2016","day":"Wed","high":"62","low":"45","text":"Rain"},{"code":"28","date":"19 May 2016","day":"Thu","high":"63","low":"48","text":"Mostly Cloudy"},{"code":"28","date":"20 May 2016","day":"Fri","high":"67","low":"50","text":"Mostly Cloudy"},{"code":"30","date":"21 May 2016","day":"Sat","high":"71","low":"50","text":"Partly Cloudy"},{"code":"30","date":"22 May 2016","day":"Sun","high":"74","low":"54","text":"Partly Cloudy"}],"description":"<![CDATA[<img src=\"http://l.yimg.com/a/i/us/we/52/23.gif\"/>\n<BR />\n<b>Current Conditions:</b>\n<BR />Breezy\n<BR />\n<BR />\n<b>Forecast:</b>\n<BR /> Fri - Partly Cloudy. High: 71Low: 48\n<BR /> Sat - Mostly Cloudy. High: 54Low: 44\n<BR /> Sun - Showers. High: 55Low: 43\n<BR /> Mon - Mostly Cloudy. High: 54Low: 42\n<BR /> Tue - Mostly Cloudy. High: 57Low: 43\n<BR />\n<BR />\n<a href=\"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-674836/\">Full Forecast at Yahoo! Weather</a>\n<BR />\n<BR />\n(provided by <a href=\"http://www.weather.com\" >The Weather Channel</a>)\n<BR />\n]]>","guid":{"isPermaLink":"false"}}}}}}'
+ elif 'Moscow' in query_url:
+ return r'{"query":{"count":1,"created":"2016-05-13T19:47:01Z","lang":"en-US","results":{"channel":{"units":{"distance":"mi","pressure":"in","speed":"mph","temperature":"C"},"title":"Yahoo! Weather - Moscow, Moscow Federal City, RU","link":"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-2122265/","description":"Yahoo! Weather for Moscow, Moscow Federal City, RU","language":"en-us","lastBuildDate":"Fri, 13 May 2016 10:47 PM MSK","ttl":"60","location":{"city":"Moscow","country":"Russia","region":" Moscow Federal City"},"wind":{"chill":"45","direction":"80","speed":"11"},"atmosphere":{"humidity":"52","pressure":"993.0","rising":"0","visibility":"16.1"},"astronomy":{"sunrise":"4:19 am","sunset":"8:34 pm"},"image":{"title":"Yahoo! Weather","width":"142","height":"18","link":"http://weather.yahoo.com","url":"http://l.yimg.com/a/i/brand/purplelogo//uh/us/news-wea.gif"},"item":{"title":"Conditions for Moscow, Moscow Federal City, RU at 09:00 PM MSK","lat":"55.741638","long":"37.605061","link":"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-2122265/","pubDate":"Fri, 13 May 2016 09:00 PM MSK","condition":{"code":"33","date":"Fri, 13 May 2016 09:00 PM MSK","temp":"9","text":"Mostly Clear"},"forecast":[{"code":"30","date":"13 May 2016","day":"Fri","high":"62","low":"41","text":"Partly Cloudy"},{"code":"30","date":"14 May 2016","day":"Sat","high":"64","low":"43","text":"Partly Cloudy"},{"code":"30","date":"15 May 2016","day":"Sun","high":"63","low":"44","text":"Partly Cloudy"},{"code":"12","date":"16 May 2016","day":"Mon","high":"60","low":"47","text":"Rain"},{"code":"12","date":"17 May 2016","day":"Tue","high":"64","low":"48","text":"Rain"},{"code":"28","date":"18 May 2016","day":"Wed","high":"67","low":"48","text":"Mostly Cloudy"},{"code":"12","date":"19 May 2016","day":"Thu","high":"68","low":"49","text":"Rain"},{"code":"39","date":"20 May 2016","day":"Fri","high":"66","low":"50","text":"Scattered Showers"},{"code":"39","date":"21 May 2016","day":"Sat","high":"69","low":"49","text":"Scattered Showers"},{"code":"30","date":"22 May 2016","day":"Sun","high":"73","low":"50","text":"Partly Cloudy"}],"description":"<![CDATA[<img src=\"http://l.yimg.com/a/i/us/we/52/33.gif\"/>\n<BR />\n<b>Current Conditions:</b>\n<BR />Mostly Clear\n<BR />\n<BR />\n<b>Forecast:</b>\n<BR /> Fri - Partly Cloudy. High: 62Low: 41\n<BR /> Sat - Partly Cloudy. High: 64Low: 43\n<BR /> Sun - Partly Cloudy. High: 63Low: 44\n<BR /> Mon - Rain. High: 60Low: 47\n<BR /> Tue - Rain. High: 64Low: 48\n<BR />\n<BR />\n<a href=\"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-2122265/\">Full Forecast at Yahoo! Weather</a>\n<BR />\n<BR />\n(provided by <a href=\"http://www.weather.com\" >The Weather Channel</a>)\n<BR />\n]]>","guid":{"isPermaLink":"false"}}}}}}'
+ else:
+ raise NotImplementedError
+
+
+class Process(object):
+ def __init__(self, output, err):
+ self.output = output
+ self.err = err
+
+ def communicate(self):
+ return self.output, self.err
+
+
+class ModuleReplace(object):
+ def __init__(self, name, new):
+ self.name = name
+ self.new = new
+
+ def __enter__(self):
+ self.old = sys.modules.get(self.name)
+ if not self.old:
+ try:
+ self.old = __import__(self.name)
+ except ImportError:
+ pass
+ sys.modules[self.name] = self.new
+
+ def __exit__(self, *args):
+ if self.old:
+ sys.modules[self.name] = self.old
+ else:
+ sys.modules.pop(self.name)
+
+
+def replace_module(name, new=None, **kwargs):
+ if not new:
+ new = new_module(name, **kwargs)
+ return ModuleReplace(name, new)
+
+
+def new_module(name, **kwargs):
+ module = imp.new_module(name)
+ for k, v in kwargs.items():
+ setattr(module, k, v)
+ return module
+
+
+class AttrReplace(object):
+ def __init__(self, obj, *args):
+ self.obj = obj
+ self.attrs = args[::2]
+ self.new = args[1::2]
+
+ def __enter__(self):
+ self.old = {}
+ for i, attr in enumerate(self.attrs):
+ try:
+ self.old[i] = getattr(self.obj, attr)
+ except AttributeError:
+ pass
+ for attr, new in zip(self.attrs, self.new):
+ setattr(self.obj, attr, new)
+
+ def __exit__(self, *args):
+ for i, attr in enumerate(self.attrs):
+ try:
+ old = self.old[i]
+ except KeyError:
+ delattr(self.obj, attr)
+ else:
+ setattr(self.obj, attr, old)
+
+
+replace_attr = AttrReplace
+
+
+def replace_module_module(module, name, **kwargs):
+ return replace_attr(module, name, new_module(name, **kwargs))
+
+
+class ItemReplace(object):
+ def __init__(self, d, key, new, r=None):
+ self.key = key
+ self.new = new
+ self.d = d
+ self.r = r
+
+ def __enter__(self):
+ self.old = self.d.get(self.key)
+ self.d[self.key] = self.new
+ return self.r
+
+ def __exit__(self, *args):
+ if self.old is None:
+ try:
+ self.d.pop(self.key)
+ except KeyError:
+ pass
+ else:
+ self.d[self.key] = self.old
+
+
+def replace_item(d, key, new):
+ return ItemReplace(d, key, new, d)
+
+
+def replace_env(key, new, environ=None, **kwargs):
+ r = kwargs.copy()
+ r['environ'] = environ or {}
+ return ItemReplace(r['environ'], key, new, r)
+
+
+class PowerlineSingleTest(object):
+ def __init__(self, suite, name):
+ self.suite = suite
+ self.name = name
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ if exc_type is not None:
+ self.exception('Exception while running test: {0!r}'.format(
+ exc_value))
+
+ def fail(self, message, allow_failure=False):
+ return self.suite.fail(self.name, message, allow_failure)
+
+ def exception(self, message, allow_failure=False):
+ return self.suite.exception(self.name, message, allow_failure)
diff --git a/tests/modules/lib/config_mock.py b/tests/modules/lib/config_mock.py
new file mode 100644
index 0000000..900b60f
--- /dev/null
+++ b/tests/modules/lib/config_mock.py
@@ -0,0 +1,230 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import os
+
+from threading import Lock
+from copy import deepcopy
+from time import sleep
+from functools import wraps
+
+from powerline.renderer import Renderer
+from powerline.lib.config import ConfigLoader
+from powerline import Powerline, get_default_theme
+
+from tests.modules.lib import Args, replace_attr
+
+
+UT = get_default_theme(is_unicode=True)
+AT = get_default_theme(is_unicode=False)
+
+
+class TestHelpers(object):
+ def __init__(self, config):
+ self.config = config
+ self.access_log = []
+ self.access_lock = Lock()
+
+ def loader_condition(self, path):
+ return (path in self.config) and path
+
+ def find_config_files(self, cfg_path, config_loader, loader_callback):
+ if cfg_path.endswith('.json'):
+ cfg_path = cfg_path[:-5]
+ if cfg_path.startswith('/'):
+ cfg_path = cfg_path.lstrip('/')
+ with self.access_lock:
+ self.access_log.append('check:' + cfg_path)
+ if cfg_path in self.config:
+ yield cfg_path
+ else:
+ if config_loader:
+ config_loader.register_missing(self.loader_condition, loader_callback, cfg_path)
+ raise IOError(('fcf:' if cfg_path.endswith('raise') else '') + cfg_path)
+
+ def load_json_config(self, config_file_path, *args, **kwargs):
+ if config_file_path.endswith('.json'):
+ config_file_path = config_file_path[:-5]
+ if config_file_path.startswith('/'):
+ config_file_path = config_file_path.lstrip('/')
+ with self.access_lock:
+ self.access_log.append('load:' + config_file_path)
+ try:
+ return deepcopy(self.config[config_file_path])
+ except KeyError:
+ raise IOError(config_file_path)
+
+ def pop_events(self):
+ with self.access_lock:
+ r = self.access_log[:]
+ self.access_log = []
+ return r
+
+
+def log_call(func):
+ @wraps(func)
+ def ret(self, *args, **kwargs):
+ self._calls.append((func.__name__, args, kwargs))
+ return func(self, *args, **kwargs)
+ return ret
+
+
+class TestWatcher(object):
+ events = set()
+ lock = Lock()
+
+ def __init__(self):
+ self._calls = []
+
+ @log_call
+ def watch(self, file):
+ pass
+
+ @log_call
+ def __call__(self, file):
+ with self.lock:
+ if file in self.events:
+ self.events.remove(file)
+ return True
+ return False
+
+ def _reset(self, files):
+ with self.lock:
+ self.events.clear()
+ self.events.update(files)
+
+ @log_call
+ def unsubscribe(self):
+ pass
+
+
+class Logger(object):
+ def __init__(self):
+ self.messages = []
+ self.lock = Lock()
+
+ def _add_msg(self, attr, msg):
+ with self.lock:
+ self.messages.append(attr + ':' + msg)
+
+ def _pop_msgs(self):
+ with self.lock:
+ r = self.messages
+ self.messages = []
+ return r
+
+ def __getattr__(self, attr):
+ return lambda *args, **kwargs: self._add_msg(attr, *args, **kwargs)
+
+
+class SimpleRenderer(Renderer):
+ def hlstyle(self, fg=None, bg=None, attrs=None):
+ return '<{fg} {bg} {attrs}>'.format(fg=fg and fg[0], bg=bg and bg[0], attrs=attrs)
+
+
+class EvenSimplerRenderer(Renderer):
+ def hlstyle(self, fg=None, bg=None, attrs=None):
+ return '{{{fg}{bg}{attrs}}}'.format(
+ fg=fg and fg[0] or '-',
+ bg=bg and bg[0] or '-',
+ attrs=attrs if attrs else '',
+ )
+
+
+class TestPowerline(Powerline):
+ _created = False
+
+ def __init__(self, _helpers, **kwargs):
+ super(TestPowerline, self).__init__(**kwargs)
+ self._helpers = _helpers
+ self.find_config_files = _helpers.find_config_files
+
+ @staticmethod
+ def get_local_themes(local_themes):
+ return local_themes
+
+ @staticmethod
+ def get_config_paths():
+ return ['']
+
+ def _will_create_renderer(self):
+ return self.cr_kwargs
+
+ def _pop_events(self):
+ return self._helpers.pop_events()
+
+
+renderer = EvenSimplerRenderer
+
+
+class TestConfigLoader(ConfigLoader):
+ def __init__(self, _helpers, **kwargs):
+ watcher = TestWatcher()
+ super(TestConfigLoader, self).__init__(
+ load=_helpers.load_json_config,
+ watcher=watcher,
+ watcher_type='test',
+ **kwargs
+ )
+
+
+def get_powerline(config, **kwargs):
+ helpers = TestHelpers(config)
+ return get_powerline_raw(
+ helpers,
+ TestPowerline,
+ _helpers=helpers,
+ ext='test',
+ renderer_module='tests.modules.lib.config_mock',
+ logger=Logger(),
+ **kwargs
+ )
+
+
+def select_renderer(simpler_renderer=False):
+ global renderer
+ renderer = EvenSimplerRenderer if simpler_renderer else SimpleRenderer
+
+
+def get_powerline_raw(helpers, PowerlineClass, replace_gcp=False, **kwargs):
+ if not isinstance(helpers, TestHelpers):
+ helpers = TestHelpers(helpers)
+ select_renderer(kwargs.pop('simpler_renderer', False))
+
+ if replace_gcp:
+ class PowerlineClass(PowerlineClass):
+ @staticmethod
+ def get_config_paths():
+ return ['/']
+
+ pl = PowerlineClass(
+ config_loader=TestConfigLoader(
+ _helpers=helpers,
+ run_once=kwargs.get('run_once')
+ ),
+ **kwargs
+ )
+ pl._watcher = pl.config_loader.watcher
+ return pl
+
+
+def swap_attributes(config, powerline_module):
+ return replace_attr(powerline_module, 'os', Args(
+ path=Args(
+ isfile=lambda path: path.lstrip('/').replace('.json', '') in config,
+ join=os.path.join,
+ expanduser=lambda path: path,
+ realpath=lambda path: path,
+ dirname=os.path.dirname,
+ ),
+ environ={},
+ ))
+
+
+def add_watcher_events(p, *args, **kwargs):
+ if isinstance(p._watcher, TestWatcher):
+ p._watcher._reset(args)
+ while not p._will_create_renderer():
+ sleep(kwargs.get('interval', 0.1))
+ if not kwargs.get('wait', True):
+ return
diff --git a/tests/modules/lib/fsconfig.py b/tests/modules/lib/fsconfig.py
new file mode 100644
index 0000000..757e874
--- /dev/null
+++ b/tests/modules/lib/fsconfig.py
@@ -0,0 +1,83 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import os
+import json
+
+from subprocess import check_call
+from shutil import rmtree
+from itertools import chain
+
+from powerline import Powerline
+
+
+CONFIG_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'config')
+
+
+class TestPowerline(Powerline):
+ def __init__(self, _paths, *args, **kwargs):
+ super(TestPowerline, self).__init__(*args, **kwargs)
+ self._paths = _paths
+
+ def get_config_paths(self):
+ return self._paths
+
+
+def mkdir_recursive(directory):
+ if os.path.isdir(directory):
+ return
+ mkdir_recursive(os.path.dirname(directory))
+ os.mkdir(directory)
+
+
+class FSTree(object):
+ __slots__ = ('tree', 'p', 'p_kwargs', 'create_p', 'get_config_paths', 'root')
+
+ def __init__(
+ self,
+ tree,
+ p_kwargs={'run_once': True},
+ root=CONFIG_DIR,
+ get_config_paths=lambda p: (p,),
+ create_p=False
+ ):
+ self.tree = tree
+ self.root = root
+ self.get_config_paths = get_config_paths
+ self.create_p = create_p
+ self.p = None
+ self.p_kwargs = p_kwargs
+
+ def __enter__(self, *args):
+ os.mkdir(self.root)
+ for k, v in self.tree.items():
+ fname = os.path.join(self.root, k) + '.json'
+ mkdir_recursive(os.path.dirname(fname))
+ with open(fname, 'w') as F:
+ json.dump(v, F)
+ if self.create_p:
+ self.p = TestPowerline(
+ _paths=self.get_config_paths(self.root),
+ ext='test',
+ renderer_module='tests.modules.lib.config_mock',
+ **self.p_kwargs
+ )
+ if os.environ.get('POWERLINE_RUN_LINT_DURING_TESTS'):
+ try:
+ check_call(chain(['scripts/powerline-lint'], *[
+ ('-p', d) for d in (
+ self.p.get_config_paths() if self.p
+ else self.get_config_paths(self.root)
+ )
+ ]))
+ except:
+ self.__exit__()
+ raise
+ return self.p and self.p.__enter__(*args)
+
+ def __exit__(self, *args):
+ try:
+ rmtree(self.root)
+ finally:
+ if self.p:
+ self.p.__exit__(*args)
diff --git a/tests/modules/lib/terminal.py b/tests/modules/lib/terminal.py
new file mode 100644
index 0000000..540135d
--- /dev/null
+++ b/tests/modules/lib/terminal.py
@@ -0,0 +1,307 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import threading
+import os
+
+from time import sleep
+from itertools import groupby
+from signal import SIGKILL
+from difflib import ndiff
+
+import pexpect
+
+from powerline.lib.unicode import u
+
+from tests.modules.lib.vterm import VTerm, Dimensions
+
+
+class MutableDimensions(object):
+ def __init__(self, rows, cols):
+ super(MutableDimensions, self).__init__()
+ self._list = [rows, cols]
+
+ def __getitem__(self, idx):
+ return self._list[idx]
+
+ def __setitem__(self, idx, val):
+ self._list[idx] = val
+
+ def __iter__(self):
+ return iter(self._list)
+
+ def __len__(self):
+ return 2
+
+ def __nonzero__(self):
+ return True
+
+ __bool__ = __nonzero__
+
+ rows = property(
+ fget = lambda self: self._list[0],
+ fset = lambda self, val: self._list.__setitem__(0, val),
+ )
+ cols = property(
+ fget = lambda self: self._list[1],
+ fset = lambda self, val: self._list.__setitem__(1, val),
+ )
+
+
+class ExpectProcess(threading.Thread):
+ def __init__(self, lib, dim, cmd, args, cwd=None, env=None):
+ super(ExpectProcess, self).__init__()
+ self.vterm = VTerm(lib, dim)
+ self.lock = threading.Lock()
+ self.dim = Dimensions(*dim)
+ self.cmd = cmd
+ self.args = args
+ self.cwd = cwd
+ self.env = env
+ self.buffer = []
+ self.child_lock = threading.Lock()
+ self.shutdown_event = threading.Event()
+ self.started_event = threading.Event()
+
+ def run(self):
+ with self.child_lock:
+ child = pexpect.spawn(self.cmd, self.args, cwd=self.cwd,
+ env=self.env)
+ sleep(0.5)
+ child.setwinsize(self.dim.rows, self.dim.cols)
+ sleep(0.5)
+ self.child = child
+ self.started_event.set()
+ status = None
+ while status is None and not self.shutdown_event.is_set():
+ try:
+ with self.child_lock:
+ s = child.read_nonblocking(size=1024, timeout=0)
+ status = child.status
+ except pexpect.TIMEOUT:
+ pass
+ except pexpect.EOF:
+ break
+ else:
+ with self.lock:
+ self.vterm.push(s)
+ self.buffer.append(s)
+
+ if status is None:
+ child.kill(SIGKILL)
+
+ def kill(self):
+ self.shutdown_event.set()
+
+ def resize(self, dim):
+ with self.child_lock:
+ self.dim = Dimensions(*dim)
+ self.child.setwinsize(self.dim.rows, self.dim.cols)
+ self.vterm.resize(self.dim)
+
+ def __getitem__(self, position):
+ with self.lock:
+ return self.vterm.vtscreen[position]
+
+ def read(self):
+ with self.lock:
+ ret = b''.join(self.buffer)
+ del self.buffer[:]
+ return ret
+
+ def send(self, data):
+ with self.child_lock:
+ self.child.send(data)
+
+ def get_highlighted_text(self, text, attrs, default_props=(),
+ use_escapes=False):
+ ret = []
+ new_attrs = attrs.copy()
+ for cell_properties, segment_text in text:
+ if use_escapes:
+ escapes = ('\033[38;2;{0};{1};{2};48;2;{3};{4};{5}'.format(
+ *(cell_properties[0] + cell_properties[1]))) + (
+ ';1' if cell_properties[2] else ''
+ ) + (
+ ';3' if cell_properties[3] else ''
+ ) + (
+ ';4' if cell_properties[4] else ''
+ ) + 'm'
+ ret.append(escapes + segment_text + '\033[0m')
+ else:
+ segment_text = segment_text.translate({'{': '{{', '}': '}}'})
+ if cell_properties not in new_attrs:
+ new_attrs[cell_properties] = len(new_attrs) + 1
+ props_name = new_attrs[cell_properties]
+ if props_name in default_props:
+ ret.append(segment_text)
+ else:
+ ret.append('{' + str(props_name) + ':' + segment_text + '}')
+ return ''.join(ret), new_attrs
+
+ def get_row(self, row, attrs, default_props=(), use_escapes=False):
+ with self.lock:
+ return self.get_highlighted_text((
+ (key, ''.join((cell.text for cell in subline)))
+ for key, subline in groupby((
+ self.vterm.vtscreen[row, col]
+ for col in range(self.dim.cols)
+ ), lambda cell: cell.cell_properties_key)
+ ), attrs, default_props, use_escapes)
+
+ def get_screen(self, attrs, default_props=(), use_escapes=False):
+ lines = []
+ for row in range(self.dim.rows):
+ line, attrs = self.get_row(row, attrs, default_props, use_escapes)
+ lines.append(line)
+ return '\n'.join(lines), attrs
+
+
+def test_expected_result(p, test, last_attempt, last_attempt_cb, attempts):
+ debugging_tests = not not os.environ.get('_POWERLINE_DEBUGGING_TESTS')
+ expected_text, attrs = test['expected_result']
+ result = None
+ while attempts:
+ if 'row' in test:
+ row = test['row']
+ else:
+ row = p.dim.rows - 1
+ while row >= 0 and not p[row, 0].text:
+ row -= 1
+ if row < 0:
+ row = 0
+ actual_text, all_attrs = p.get_row(row, attrs)
+ if actual_text == expected_text:
+ return True
+ attempts -= 1
+ print('Actual result does not match expected for row {0}. Attempts left: {1}.'.format(
+ row, attempts))
+ sleep(2)
+ print('Result (row {0}):'.format(row))
+ print(actual_text)
+ print('Expected:')
+ print(expected_text)
+ print('Attributes:')
+ for v, k in sorted(
+ ((v, k) for k, v in all_attrs.items()),
+ key=(lambda t: '%02u'.format(t[0]) if isinstance(t[0], int) else t[0]),
+ ):
+ print('{k!r}: {v!r},'.format(v=v, k=k))
+ print('Screen:')
+ screen, screen_attrs = p.get_screen(attrs, use_escapes=debugging_tests)
+ print(screen)
+ print(screen_attrs)
+ print('_' * 80)
+ print('Diff:')
+ print('=' * 80)
+ print(''.join((
+ u(line) for line in ndiff([actual_text + '\n'], [expected_text + '\n']))
+ ))
+ if last_attempt and last_attempt_cb:
+ last_attempt_cb()
+ return False
+
+
+ENV_BASE = {
+ # Reasoning:
+ # 1. vt* TERMs (used to be vt100 here) make tmux-1.9 use different and
+ # identical colors for inactive windows. This is not like tmux-1.6:
+ # foreground color is different from separator color and equal to (0,
+ # 102, 153) for some reason (separator has correct color). tmux-1.8 is
+ # fine, so are older versions (though tmux-1.6 and tmux-1.7 do not have
+ # highlighting for previously active window) and my system tmux-1.9a.
+ # 2. screen, xterm and some other non-256color terminals both have the same
+ # issue and make libvterm emit complains like `Unhandled CSI SGR 3231`.
+ # 3. screen-256color, xterm-256color and other -256color terminals make
+ # libvterm emit complains about unhandled escapes to stderr.
+ # 4. `st-256color` does not have any of the above problems, but it may be
+ # not present on the target system because it is installed with
+ # x11-terms/st and not with sys-libs/ncurses.
+ #
+ # For the given reasons decision was made: to fix tmux-1.9 tests and not
+ # make libvterm emit any data to stderr st-256color $TERM should be used, up
+ # until libvterm has its own terminfo database entry (if it ever will). To
+ # make sure that relevant terminfo entry is present on the target system it
+ # should be distributed with powerline test package. To make distribution
+ # not require modifying anything outside of powerline test directory
+ # TERMINFO variable is set.
+ #
+ # This fix propagates to non-tmux vterm tests just in case.
+ 'TERM': 'st-256color',
+ # Also $TERMINFO definition in get_env
+
+ 'POWERLINE_CONFIG_PATHS': os.path.abspath('powerline/config_files'),
+ 'POWERLINE_COMMAND': 'powerline-render',
+ 'LD_LIBRARY_PATH': os.environ.get('LD_LIBRARY_PATH', ''),
+ 'PYTHONPATH': os.environ.get('PYTHONPATH', ''),
+}
+
+
+def get_env(vterm_path, test_dir, *args, **kwargs):
+ env = ENV_BASE.copy()
+ env.update({
+ 'TERMINFO': os.path.join(test_dir, 'terminfo'),
+ 'PATH': vterm_path,
+ 'SHELL': os.path.join(vterm_path, 'bash'),
+ })
+ env.update(*args, **kwargs)
+ return env
+
+
+def do_terminal_tests(tests, cmd, dim, args, env, suite, cwd=None, fin_cb=None,
+ last_attempt_cb=None, attempts=None):
+ debugging_tests = not not os.environ.get('_POWERLINE_DEBUGGING_TESTS')
+ default_attempts = 2 if debugging_tests else 3
+ if attempts is None:
+ attempts = default_attempts
+ lib = os.environ.get('POWERLINE_LIBVTERM')
+ if not lib:
+ if os.path.exists('tests/bot-ci/deps/libvterm/libvterm.so'):
+ lib = 'tests/bot-ci/deps/libvterm/libvterm.so'
+ else:
+ lib = 'libvterm.so'
+
+ while attempts:
+ try:
+ p = ExpectProcess(
+ lib=lib,
+ dim=dim,
+ cmd=cmd,
+ args=args,
+ cwd=cwd,
+ env=env,
+ )
+ p.start()
+ p.started_event.wait()
+
+ ret = True
+
+ for i, test in enumerate(tests):
+ with suite.test(test.get('name', 'test_{0}'.format(i)),
+ attempts - 1) as ptest:
+ try:
+ test_prep = test['prep_cb']
+ except KeyError:
+ pass
+ else:
+ test_prep(p)
+ test_result = test_expected_result(
+ p, test, attempts == 0, last_attempt_cb,
+ test.get('attempts', default_attempts)
+ )
+ if not test_result:
+ ptest.fail('Result does not match expected')
+ ret = ret and test_result
+
+ if ret:
+ return ret
+ finally:
+ if fin_cb:
+ fin_cb(p=p, cmd=cmd, env=env)
+ p.kill()
+ p.join(10)
+ assert(not p.isAlive())
+
+ attempts -= 1
+
+ return False
diff --git a/tests/modules/lib/vterm.py b/tests/modules/lib/vterm.py
new file mode 100644
index 0000000..1984e1b
--- /dev/null
+++ b/tests/modules/lib/vterm.py
@@ -0,0 +1,193 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import ctypes
+
+from collections import namedtuple
+
+from powerline.lib.unicode import unicode, unichr, tointiter
+
+
+Dimensions = namedtuple('Dimensions', ('rows', 'cols'))
+
+
+class CTypesFunction(object):
+ def __init__(self, library, name, rettype, args):
+ self.name = name
+ self.prototype = ctypes.CFUNCTYPE(rettype, *[
+ arg[1] for arg in args
+ ])
+ self.args = args
+ self.func = self.prototype((name, library), tuple((
+ (1, arg[0]) for arg in args
+ )))
+
+ def __call__(self, *args, **kwargs):
+ return self.func(*args, **kwargs)
+
+ def __repr__(self):
+ return '{cls}(<library>, {name!r}, {rettype!r}, {args!r})'.format(
+ cls=self.__class__.__name__,
+ **self.__dict__
+ )
+
+
+class CTypesLibraryFuncsCollection(object):
+ def __init__(self, lib, **kwargs):
+ self.lib = lib
+ library_loader = ctypes.LibraryLoader(ctypes.CDLL)
+ library = library_loader.LoadLibrary(lib)
+ self.library = library
+ for name, args in kwargs.items():
+ self.__dict__[name] = CTypesFunction(library, name, *args)
+
+
+class VTermPos_s(ctypes.Structure):
+ _fields_ = (
+ ('row', ctypes.c_int),
+ ('col', ctypes.c_int),
+ )
+
+
+class VTermColor_s(ctypes.Structure):
+ _fields_ = (
+ ('red', ctypes.c_uint8),
+ ('green', ctypes.c_uint8),
+ ('blue', ctypes.c_uint8),
+ )
+
+
+class VTermScreenCellAttrs_s(ctypes.Structure):
+ _fields_ = (
+ ('bold', ctypes.c_uint, 1),
+ ('underline', ctypes.c_uint, 2),
+ ('italic', ctypes.c_uint, 1),
+ ('blink', ctypes.c_uint, 1),
+ ('reverse', ctypes.c_uint, 1),
+ ('strike', ctypes.c_uint, 1),
+ ('font', ctypes.c_uint, 4),
+ ('dwl', ctypes.c_uint, 1),
+ ('dhl', ctypes.c_uint, 2),
+ )
+
+
+VTERM_MAX_CHARS_PER_CELL = 6
+
+
+class VTermScreenCell_s(ctypes.Structure):
+ _fields_ = (
+ ('chars', ctypes.ARRAY(ctypes.c_uint32, VTERM_MAX_CHARS_PER_CELL)),
+ ('width', ctypes.c_char),
+ ('attrs', VTermScreenCellAttrs_s),
+ ('fg', VTermColor_s),
+ ('bg', VTermColor_s),
+ )
+
+
+VTerm_p = ctypes.c_void_p
+VTermScreen_p = ctypes.c_void_p
+
+
+def get_functions(lib):
+ return CTypesLibraryFuncsCollection(
+ lib,
+ vterm_new=(VTerm_p, (
+ ('rows', ctypes.c_int),
+ ('cols', ctypes.c_int)
+ )),
+ vterm_obtain_screen=(VTermScreen_p, (('vt', VTerm_p),)),
+ vterm_set_size=(None, (
+ ('vt', VTerm_p),
+ ('rows', ctypes.c_int),
+ ('cols', ctypes.c_int)
+ )),
+ vterm_screen_reset=(None, (
+ ('screen', VTermScreen_p),
+ ('hard', ctypes.c_int)
+ )),
+ vterm_input_write=(ctypes.c_size_t, (
+ ('vt', VTerm_p),
+ ('bytes', ctypes.POINTER(ctypes.c_char)),
+ ('size', ctypes.c_size_t),
+ )),
+ vterm_screen_get_cell=(ctypes.c_int, (
+ ('screen', VTermScreen_p),
+ ('pos', VTermPos_s),
+ ('cell', ctypes.POINTER(VTermScreenCell_s))
+ )),
+ vterm_free=(None, (('vt', VTerm_p),)),
+ vterm_set_utf8=(None, (('vt', VTerm_p), ('is_utf8', ctypes.c_int))),
+ )
+
+
+class VTermColor(object):
+ __slots__ = ('red', 'green', 'blue')
+
+ def __init__(self, color):
+ self.red = color.red
+ self.green = color.green
+ self.blue = color.blue
+
+ @property
+ def color_key(self):
+ return (self.red, self.green, self.blue)
+
+
+class VTermScreenCell(object):
+ def __init__(self, vtsc):
+ for field in VTermScreenCellAttrs_s._fields_:
+ field_name = field[0]
+ setattr(self, field_name, getattr(vtsc.attrs, field_name))
+ self.text = ''.join((
+ unichr(vtsc.chars[i]) for i in range(VTERM_MAX_CHARS_PER_CELL)
+ )).rstrip('\x00')
+ self.width = next(tointiter(vtsc.width))
+ self.fg = VTermColor(vtsc.fg)
+ self.bg = VTermColor(vtsc.bg)
+ self.cell_properties_key = (
+ self.fg.color_key,
+ self.bg.color_key,
+ self.bold,
+ self.underline,
+ self.italic,
+ )
+
+
+class VTermScreen(object):
+ def __init__(self, functions, screen):
+ self.functions = functions
+ self.screen = screen
+
+ def __getitem__(self, position):
+ pos = VTermPos_s(*position)
+ cell = VTermScreenCell_s()
+ ret = self.functions.vterm_screen_get_cell(self.screen, pos, cell)
+ if ret != 1:
+ raise ValueError('vterm_screen_get_cell returned {0}'.format(ret))
+ return VTermScreenCell(cell)
+
+ def reset(self, hard):
+ self.functions.vterm_screen_reset(self.screen, int(bool(hard)))
+
+
+class VTerm(object):
+ def __init__(self, lib, dim):
+ self.functions = get_functions(lib)
+ self.vt = self.functions.vterm_new(dim.rows, dim.cols)
+ self.functions.vterm_set_utf8(self.vt, 1)
+ self.vtscreen = VTermScreen(self.functions, self.functions.vterm_obtain_screen(self.vt))
+ self.vtscreen.reset(True)
+
+ def push(self, data):
+ if isinstance(data, unicode):
+ data = data.encode('utf-8')
+ return self.functions.vterm_input_write(self.vt, data, len(data))
+
+ def resize(self, dim):
+ self.functions.vterm_set_size(self.vt, dim.rows, dim.cols)
+
+ def __del__(self):
+ try:
+ self.functions.vterm_free(self.vt)
+ except AttributeError:
+ pass