diff options
Diffstat (limited to 'tests/modules')
-rw-r--r-- | tests/modules/__init__.py | 94 | ||||
-rw-r--r-- | tests/modules/lib/__init__.py | 183 | ||||
-rw-r--r-- | tests/modules/lib/config_mock.py | 230 | ||||
-rw-r--r-- | tests/modules/lib/fsconfig.py | 83 | ||||
-rw-r--r-- | tests/modules/lib/terminal.py | 307 | ||||
-rw-r--r-- | tests/modules/lib/vterm.py | 193 | ||||
-rw-r--r-- | tests/modules/matchers.py | 6 | ||||
-rw-r--r-- | tests/modules/vim.py | 927 |
8 files changed, 2023 insertions, 0 deletions
diff --git a/tests/modules/__init__.py b/tests/modules/__init__.py new file mode 100644 index 0000000..12aae20 --- /dev/null +++ b/tests/modules/__init__.py @@ -0,0 +1,94 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import sys +import os + +if sys.version_info < (2, 7): + from unittest2 import TestCase as _TestCase # NOQA + from unittest2 import main as _main # NOQA + from unittest2.case import SkipTest # NOQA +else: + from unittest import TestCase as _TestCase # NOQA + from unittest import main as _main # NOQA + from unittest.case import SkipTest # NOQA + +from tests.modules.lib import PowerlineSingleTest + + +class PowerlineDummyTest(object): + def __enter__(self): + return self + + def __exit__(self, *args): + pass + + def fail(self, *args, **kwargs): + pass + + def exception(self, *args, **kwargs): + pass + + +class PowerlineTestSuite(object): + def __init__(self, name): + self.name = name + + def __enter__(self): + self.saved_current_suite = os.environ['POWERLINE_CURRENT_SUITE'] + os.environ['POWERLINE_CURRENT_SUITE'] = ( + self.saved_current_suite + '/' + self.name) + self.suite = self.saved_current_suite + '/' + self.name + return self + + def __exit__(self, exc_type, exc_value, traceback): + if exc_type is not None: + self.exception( + 'suite_noexcept', + 'Exception while running test suite: {0!r}'.format(exc_value), + ) + os.environ['POWERLINE_CURRENT_SUITE'] = self.saved_current_suite + + def record_test_failure(self, fail_char, test_name, message, allow_failure=False): + if allow_failure: + fail_char = 'A' + fail_char + full_msg = '{fail_char} {suite}|{test_name} :: {message}'.format( + fail_char=fail_char, + suite=self.suite, + test_name=test_name, + message=message, + ) + with open(os.environ['FAILURES_FILE'], 'a') as ffd: + ffd.write(full_msg + '\n') + return False + + def exception(self, test_name, message, allow_failure=False): + return self.record_test_failure('E', test_name, message, allow_failure) + + def fail(self, test_name, message, allow_failure=False): + return self.record_test_failure('F', test_name, message, allow_failure) + + def test(self, name, attempts_left=0): + if not attempts_left: + return PowerlineSingleTest(self, name) + else: + return PowerlineDummyTest() + + def subsuite(self, name): + return PowerlineTestSuite(name) + + +suite = None + + +def main(*args, **kwargs): + global suite + suite = PowerlineTestSuite(sys.argv[0]) + _main(*args, **kwargs) + + +class TestCase(_TestCase): + def fail(self, msg=None): + suite.fail(self.__class__.__name__, + msg or 'Test failed without message') + super(TestCase, self).fail(*args, **kwargs) diff --git a/tests/modules/lib/__init__.py b/tests/modules/lib/__init__.py new file mode 100644 index 0000000..d45ccae --- /dev/null +++ b/tests/modules/lib/__init__.py @@ -0,0 +1,183 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import imp +import sys + + +class Pl(object): + def __init__(self): + self.exceptions = [] + self.errors = [] + self.warns = [] + self.debugs = [] + self.infos = [] + self.prefix = None + self.use_daemon_threads = True + + for meth in ('error', 'warn', 'debug', 'exception', 'info'): + exec(( + 'def {0}(self, msg, *args, **kwargs):\n' + ' self.{0}s.append((kwargs.get("prefix") or self.prefix, msg, args, kwargs))\n' + ).format(meth)) + + def __nonzero__(self): + return bool(self.exceptions or self.errors or self.warns) + + __bool__ = __nonzero__ + + +class Args(object): + theme_override = {} + config_override = {} + config_path = None + ext = ['shell'] + renderer_module = None + + def __init__(self, **kwargs): + self.__dict__.update(kwargs) + + +def urllib_read(query_url): + if query_url.startswith('http://ipv'): + if query_url.startswith('http://ipv4.icanhazip.com'): + return '127.0.0.1' + elif query_url.startswith('http://ipv4.icanhazip.com'): + return '2001:4801:7818:6:abc5:ba2c:ff10:275f' + elif query_url.startswith('http://geoip.nekudo.com/api/'): + return '{"city":"Meppen","country":{"name":"Germany", "code":"DE"},"location":{"accuracy_radius":100,"latitude":52.6833,"longitude":7.3167,"time_zone":"Europe\/Berlin"},"ip":"82.145.55.16"}' + elif query_url.startswith('http://query.yahooapis.com/v1/public/'): + if 'Meppen' in query_url: + return r'{"query":{"count":1,"created":"2016-05-13T19:43:18Z","lang":"en-US","results":{"channel":{"units":{"distance":"mi","pressure":"in","speed":"mph","temperature":"C"},"title":"Yahoo! Weather - Meppen, NI, DE","link":"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-674836/","description":"Yahoo! Weather for Meppen, NI, DE","language":"en-us","lastBuildDate":"Fri, 13 May 2016 09:43 PM CEST","ttl":"60","location":{"city":"Meppen","country":"Germany","region":" NI"},"wind":{"chill":"55","direction":"350","speed":"25"},"atmosphere":{"humidity":"57","pressure":"1004.0","rising":"0","visibility":"16.1"},"astronomy":{"sunrise":"5:35 am","sunset":"9:21 pm"},"image":{"title":"Yahoo! Weather","width":"142","height":"18","link":"http://weather.yahoo.com","url":"http://l.yimg.com/a/i/brand/purplelogo//uh/us/news-wea.gif"},"item":{"title":"Conditions for Meppen, NI, DE at 08:00 PM CEST","lat":"52.68993","long":"7.29115","link":"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-674836/","pubDate":"Fri, 13 May 2016 08:00 PM CEST","condition":{"code":"23","date":"Fri, 13 May 2016 08:00 PM CEST","temp":"14","text":"Breezy"},"forecast":[{"code":"30","date":"13 May 2016","day":"Fri","high":"71","low":"48","text":"Partly Cloudy"},{"code":"28","date":"14 May 2016","day":"Sat","high":"54","low":"44","text":"Mostly Cloudy"},{"code":"11","date":"15 May 2016","day":"Sun","high":"55","low":"43","text":"Showers"},{"code":"28","date":"16 May 2016","day":"Mon","high":"54","low":"42","text":"Mostly Cloudy"},{"code":"28","date":"17 May 2016","day":"Tue","high":"57","low":"43","text":"Mostly Cloudy"},{"code":"12","date":"18 May 2016","day":"Wed","high":"62","low":"45","text":"Rain"},{"code":"28","date":"19 May 2016","day":"Thu","high":"63","low":"48","text":"Mostly Cloudy"},{"code":"28","date":"20 May 2016","day":"Fri","high":"67","low":"50","text":"Mostly Cloudy"},{"code":"30","date":"21 May 2016","day":"Sat","high":"71","low":"50","text":"Partly Cloudy"},{"code":"30","date":"22 May 2016","day":"Sun","high":"74","low":"54","text":"Partly Cloudy"}],"description":"<![CDATA[<img src=\"http://l.yimg.com/a/i/us/we/52/23.gif\"/>\n<BR />\n<b>Current Conditions:</b>\n<BR />Breezy\n<BR />\n<BR />\n<b>Forecast:</b>\n<BR /> Fri - Partly Cloudy. High: 71Low: 48\n<BR /> Sat - Mostly Cloudy. High: 54Low: 44\n<BR /> Sun - Showers. High: 55Low: 43\n<BR /> Mon - Mostly Cloudy. High: 54Low: 42\n<BR /> Tue - Mostly Cloudy. High: 57Low: 43\n<BR />\n<BR />\n<a href=\"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-674836/\">Full Forecast at Yahoo! Weather</a>\n<BR />\n<BR />\n(provided by <a href=\"http://www.weather.com\" >The Weather Channel</a>)\n<BR />\n]]>","guid":{"isPermaLink":"false"}}}}}}' + elif 'Moscow' in query_url: + return r'{"query":{"count":1,"created":"2016-05-13T19:47:01Z","lang":"en-US","results":{"channel":{"units":{"distance":"mi","pressure":"in","speed":"mph","temperature":"C"},"title":"Yahoo! Weather - Moscow, Moscow Federal City, RU","link":"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-2122265/","description":"Yahoo! Weather for Moscow, Moscow Federal City, RU","language":"en-us","lastBuildDate":"Fri, 13 May 2016 10:47 PM MSK","ttl":"60","location":{"city":"Moscow","country":"Russia","region":" Moscow Federal City"},"wind":{"chill":"45","direction":"80","speed":"11"},"atmosphere":{"humidity":"52","pressure":"993.0","rising":"0","visibility":"16.1"},"astronomy":{"sunrise":"4:19 am","sunset":"8:34 pm"},"image":{"title":"Yahoo! Weather","width":"142","height":"18","link":"http://weather.yahoo.com","url":"http://l.yimg.com/a/i/brand/purplelogo//uh/us/news-wea.gif"},"item":{"title":"Conditions for Moscow, Moscow Federal City, RU at 09:00 PM MSK","lat":"55.741638","long":"37.605061","link":"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-2122265/","pubDate":"Fri, 13 May 2016 09:00 PM MSK","condition":{"code":"33","date":"Fri, 13 May 2016 09:00 PM MSK","temp":"9","text":"Mostly Clear"},"forecast":[{"code":"30","date":"13 May 2016","day":"Fri","high":"62","low":"41","text":"Partly Cloudy"},{"code":"30","date":"14 May 2016","day":"Sat","high":"64","low":"43","text":"Partly Cloudy"},{"code":"30","date":"15 May 2016","day":"Sun","high":"63","low":"44","text":"Partly Cloudy"},{"code":"12","date":"16 May 2016","day":"Mon","high":"60","low":"47","text":"Rain"},{"code":"12","date":"17 May 2016","day":"Tue","high":"64","low":"48","text":"Rain"},{"code":"28","date":"18 May 2016","day":"Wed","high":"67","low":"48","text":"Mostly Cloudy"},{"code":"12","date":"19 May 2016","day":"Thu","high":"68","low":"49","text":"Rain"},{"code":"39","date":"20 May 2016","day":"Fri","high":"66","low":"50","text":"Scattered Showers"},{"code":"39","date":"21 May 2016","day":"Sat","high":"69","low":"49","text":"Scattered Showers"},{"code":"30","date":"22 May 2016","day":"Sun","high":"73","low":"50","text":"Partly Cloudy"}],"description":"<![CDATA[<img src=\"http://l.yimg.com/a/i/us/we/52/33.gif\"/>\n<BR />\n<b>Current Conditions:</b>\n<BR />Mostly Clear\n<BR />\n<BR />\n<b>Forecast:</b>\n<BR /> Fri - Partly Cloudy. High: 62Low: 41\n<BR /> Sat - Partly Cloudy. High: 64Low: 43\n<BR /> Sun - Partly Cloudy. High: 63Low: 44\n<BR /> Mon - Rain. High: 60Low: 47\n<BR /> Tue - Rain. High: 64Low: 48\n<BR />\n<BR />\n<a href=\"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-2122265/\">Full Forecast at Yahoo! Weather</a>\n<BR />\n<BR />\n(provided by <a href=\"http://www.weather.com\" >The Weather Channel</a>)\n<BR />\n]]>","guid":{"isPermaLink":"false"}}}}}}' + else: + raise NotImplementedError + + +class Process(object): + def __init__(self, output, err): + self.output = output + self.err = err + + def communicate(self): + return self.output, self.err + + +class ModuleReplace(object): + def __init__(self, name, new): + self.name = name + self.new = new + + def __enter__(self): + self.old = sys.modules.get(self.name) + if not self.old: + try: + self.old = __import__(self.name) + except ImportError: + pass + sys.modules[self.name] = self.new + + def __exit__(self, *args): + if self.old: + sys.modules[self.name] = self.old + else: + sys.modules.pop(self.name) + + +def replace_module(name, new=None, **kwargs): + if not new: + new = new_module(name, **kwargs) + return ModuleReplace(name, new) + + +def new_module(name, **kwargs): + module = imp.new_module(name) + for k, v in kwargs.items(): + setattr(module, k, v) + return module + + +class AttrReplace(object): + def __init__(self, obj, *args): + self.obj = obj + self.attrs = args[::2] + self.new = args[1::2] + + def __enter__(self): + self.old = {} + for i, attr in enumerate(self.attrs): + try: + self.old[i] = getattr(self.obj, attr) + except AttributeError: + pass + for attr, new in zip(self.attrs, self.new): + setattr(self.obj, attr, new) + + def __exit__(self, *args): + for i, attr in enumerate(self.attrs): + try: + old = self.old[i] + except KeyError: + delattr(self.obj, attr) + else: + setattr(self.obj, attr, old) + + +replace_attr = AttrReplace + + +def replace_module_module(module, name, **kwargs): + return replace_attr(module, name, new_module(name, **kwargs)) + + +class ItemReplace(object): + def __init__(self, d, key, new, r=None): + self.key = key + self.new = new + self.d = d + self.r = r + + def __enter__(self): + self.old = self.d.get(self.key) + self.d[self.key] = self.new + return self.r + + def __exit__(self, *args): + if self.old is None: + try: + self.d.pop(self.key) + except KeyError: + pass + else: + self.d[self.key] = self.old + + +def replace_item(d, key, new): + return ItemReplace(d, key, new, d) + + +def replace_env(key, new, environ=None, **kwargs): + r = kwargs.copy() + r['environ'] = environ or {} + return ItemReplace(r['environ'], key, new, r) + + +class PowerlineSingleTest(object): + def __init__(self, suite, name): + self.suite = suite + self.name = name + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + if exc_type is not None: + self.exception('Exception while running test: {0!r}'.format( + exc_value)) + + def fail(self, message, allow_failure=False): + return self.suite.fail(self.name, message, allow_failure) + + def exception(self, message, allow_failure=False): + return self.suite.exception(self.name, message, allow_failure) diff --git a/tests/modules/lib/config_mock.py b/tests/modules/lib/config_mock.py new file mode 100644 index 0000000..900b60f --- /dev/null +++ b/tests/modules/lib/config_mock.py @@ -0,0 +1,230 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import os + +from threading import Lock +from copy import deepcopy +from time import sleep +from functools import wraps + +from powerline.renderer import Renderer +from powerline.lib.config import ConfigLoader +from powerline import Powerline, get_default_theme + +from tests.modules.lib import Args, replace_attr + + +UT = get_default_theme(is_unicode=True) +AT = get_default_theme(is_unicode=False) + + +class TestHelpers(object): + def __init__(self, config): + self.config = config + self.access_log = [] + self.access_lock = Lock() + + def loader_condition(self, path): + return (path in self.config) and path + + def find_config_files(self, cfg_path, config_loader, loader_callback): + if cfg_path.endswith('.json'): + cfg_path = cfg_path[:-5] + if cfg_path.startswith('/'): + cfg_path = cfg_path.lstrip('/') + with self.access_lock: + self.access_log.append('check:' + cfg_path) + if cfg_path in self.config: + yield cfg_path + else: + if config_loader: + config_loader.register_missing(self.loader_condition, loader_callback, cfg_path) + raise IOError(('fcf:' if cfg_path.endswith('raise') else '') + cfg_path) + + def load_json_config(self, config_file_path, *args, **kwargs): + if config_file_path.endswith('.json'): + config_file_path = config_file_path[:-5] + if config_file_path.startswith('/'): + config_file_path = config_file_path.lstrip('/') + with self.access_lock: + self.access_log.append('load:' + config_file_path) + try: + return deepcopy(self.config[config_file_path]) + except KeyError: + raise IOError(config_file_path) + + def pop_events(self): + with self.access_lock: + r = self.access_log[:] + self.access_log = [] + return r + + +def log_call(func): + @wraps(func) + def ret(self, *args, **kwargs): + self._calls.append((func.__name__, args, kwargs)) + return func(self, *args, **kwargs) + return ret + + +class TestWatcher(object): + events = set() + lock = Lock() + + def __init__(self): + self._calls = [] + + @log_call + def watch(self, file): + pass + + @log_call + def __call__(self, file): + with self.lock: + if file in self.events: + self.events.remove(file) + return True + return False + + def _reset(self, files): + with self.lock: + self.events.clear() + self.events.update(files) + + @log_call + def unsubscribe(self): + pass + + +class Logger(object): + def __init__(self): + self.messages = [] + self.lock = Lock() + + def _add_msg(self, attr, msg): + with self.lock: + self.messages.append(attr + ':' + msg) + + def _pop_msgs(self): + with self.lock: + r = self.messages + self.messages = [] + return r + + def __getattr__(self, attr): + return lambda *args, **kwargs: self._add_msg(attr, *args, **kwargs) + + +class SimpleRenderer(Renderer): + def hlstyle(self, fg=None, bg=None, attrs=None): + return '<{fg} {bg} {attrs}>'.format(fg=fg and fg[0], bg=bg and bg[0], attrs=attrs) + + +class EvenSimplerRenderer(Renderer): + def hlstyle(self, fg=None, bg=None, attrs=None): + return '{{{fg}{bg}{attrs}}}'.format( + fg=fg and fg[0] or '-', + bg=bg and bg[0] or '-', + attrs=attrs if attrs else '', + ) + + +class TestPowerline(Powerline): + _created = False + + def __init__(self, _helpers, **kwargs): + super(TestPowerline, self).__init__(**kwargs) + self._helpers = _helpers + self.find_config_files = _helpers.find_config_files + + @staticmethod + def get_local_themes(local_themes): + return local_themes + + @staticmethod + def get_config_paths(): + return [''] + + def _will_create_renderer(self): + return self.cr_kwargs + + def _pop_events(self): + return self._helpers.pop_events() + + +renderer = EvenSimplerRenderer + + +class TestConfigLoader(ConfigLoader): + def __init__(self, _helpers, **kwargs): + watcher = TestWatcher() + super(TestConfigLoader, self).__init__( + load=_helpers.load_json_config, + watcher=watcher, + watcher_type='test', + **kwargs + ) + + +def get_powerline(config, **kwargs): + helpers = TestHelpers(config) + return get_powerline_raw( + helpers, + TestPowerline, + _helpers=helpers, + ext='test', + renderer_module='tests.modules.lib.config_mock', + logger=Logger(), + **kwargs + ) + + +def select_renderer(simpler_renderer=False): + global renderer + renderer = EvenSimplerRenderer if simpler_renderer else SimpleRenderer + + +def get_powerline_raw(helpers, PowerlineClass, replace_gcp=False, **kwargs): + if not isinstance(helpers, TestHelpers): + helpers = TestHelpers(helpers) + select_renderer(kwargs.pop('simpler_renderer', False)) + + if replace_gcp: + class PowerlineClass(PowerlineClass): + @staticmethod + def get_config_paths(): + return ['/'] + + pl = PowerlineClass( + config_loader=TestConfigLoader( + _helpers=helpers, + run_once=kwargs.get('run_once') + ), + **kwargs + ) + pl._watcher = pl.config_loader.watcher + return pl + + +def swap_attributes(config, powerline_module): + return replace_attr(powerline_module, 'os', Args( + path=Args( + isfile=lambda path: path.lstrip('/').replace('.json', '') in config, + join=os.path.join, + expanduser=lambda path: path, + realpath=lambda path: path, + dirname=os.path.dirname, + ), + environ={}, + )) + + +def add_watcher_events(p, *args, **kwargs): + if isinstance(p._watcher, TestWatcher): + p._watcher._reset(args) + while not p._will_create_renderer(): + sleep(kwargs.get('interval', 0.1)) + if not kwargs.get('wait', True): + return diff --git a/tests/modules/lib/fsconfig.py b/tests/modules/lib/fsconfig.py new file mode 100644 index 0000000..757e874 --- /dev/null +++ b/tests/modules/lib/fsconfig.py @@ -0,0 +1,83 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import os +import json + +from subprocess import check_call +from shutil import rmtree +from itertools import chain + +from powerline import Powerline + + +CONFIG_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'config') + + +class TestPowerline(Powerline): + def __init__(self, _paths, *args, **kwargs): + super(TestPowerline, self).__init__(*args, **kwargs) + self._paths = _paths + + def get_config_paths(self): + return self._paths + + +def mkdir_recursive(directory): + if os.path.isdir(directory): + return + mkdir_recursive(os.path.dirname(directory)) + os.mkdir(directory) + + +class FSTree(object): + __slots__ = ('tree', 'p', 'p_kwargs', 'create_p', 'get_config_paths', 'root') + + def __init__( + self, + tree, + p_kwargs={'run_once': True}, + root=CONFIG_DIR, + get_config_paths=lambda p: (p,), + create_p=False + ): + self.tree = tree + self.root = root + self.get_config_paths = get_config_paths + self.create_p = create_p + self.p = None + self.p_kwargs = p_kwargs + + def __enter__(self, *args): + os.mkdir(self.root) + for k, v in self.tree.items(): + fname = os.path.join(self.root, k) + '.json' + mkdir_recursive(os.path.dirname(fname)) + with open(fname, 'w') as F: + json.dump(v, F) + if self.create_p: + self.p = TestPowerline( + _paths=self.get_config_paths(self.root), + ext='test', + renderer_module='tests.modules.lib.config_mock', + **self.p_kwargs + ) + if os.environ.get('POWERLINE_RUN_LINT_DURING_TESTS'): + try: + check_call(chain(['scripts/powerline-lint'], *[ + ('-p', d) for d in ( + self.p.get_config_paths() if self.p + else self.get_config_paths(self.root) + ) + ])) + except: + self.__exit__() + raise + return self.p and self.p.__enter__(*args) + + def __exit__(self, *args): + try: + rmtree(self.root) + finally: + if self.p: + self.p.__exit__(*args) diff --git a/tests/modules/lib/terminal.py b/tests/modules/lib/terminal.py new file mode 100644 index 0000000..540135d --- /dev/null +++ b/tests/modules/lib/terminal.py @@ -0,0 +1,307 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import threading +import os + +from time import sleep +from itertools import groupby +from signal import SIGKILL +from difflib import ndiff + +import pexpect + +from powerline.lib.unicode import u + +from tests.modules.lib.vterm import VTerm, Dimensions + + +class MutableDimensions(object): + def __init__(self, rows, cols): + super(MutableDimensions, self).__init__() + self._list = [rows, cols] + + def __getitem__(self, idx): + return self._list[idx] + + def __setitem__(self, idx, val): + self._list[idx] = val + + def __iter__(self): + return iter(self._list) + + def __len__(self): + return 2 + + def __nonzero__(self): + return True + + __bool__ = __nonzero__ + + rows = property( + fget = lambda self: self._list[0], + fset = lambda self, val: self._list.__setitem__(0, val), + ) + cols = property( + fget = lambda self: self._list[1], + fset = lambda self, val: self._list.__setitem__(1, val), + ) + + +class ExpectProcess(threading.Thread): + def __init__(self, lib, dim, cmd, args, cwd=None, env=None): + super(ExpectProcess, self).__init__() + self.vterm = VTerm(lib, dim) + self.lock = threading.Lock() + self.dim = Dimensions(*dim) + self.cmd = cmd + self.args = args + self.cwd = cwd + self.env = env + self.buffer = [] + self.child_lock = threading.Lock() + self.shutdown_event = threading.Event() + self.started_event = threading.Event() + + def run(self): + with self.child_lock: + child = pexpect.spawn(self.cmd, self.args, cwd=self.cwd, + env=self.env) + sleep(0.5) + child.setwinsize(self.dim.rows, self.dim.cols) + sleep(0.5) + self.child = child + self.started_event.set() + status = None + while status is None and not self.shutdown_event.is_set(): + try: + with self.child_lock: + s = child.read_nonblocking(size=1024, timeout=0) + status = child.status + except pexpect.TIMEOUT: + pass + except pexpect.EOF: + break + else: + with self.lock: + self.vterm.push(s) + self.buffer.append(s) + + if status is None: + child.kill(SIGKILL) + + def kill(self): + self.shutdown_event.set() + + def resize(self, dim): + with self.child_lock: + self.dim = Dimensions(*dim) + self.child.setwinsize(self.dim.rows, self.dim.cols) + self.vterm.resize(self.dim) + + def __getitem__(self, position): + with self.lock: + return self.vterm.vtscreen[position] + + def read(self): + with self.lock: + ret = b''.join(self.buffer) + del self.buffer[:] + return ret + + def send(self, data): + with self.child_lock: + self.child.send(data) + + def get_highlighted_text(self, text, attrs, default_props=(), + use_escapes=False): + ret = [] + new_attrs = attrs.copy() + for cell_properties, segment_text in text: + if use_escapes: + escapes = ('\033[38;2;{0};{1};{2};48;2;{3};{4};{5}'.format( + *(cell_properties[0] + cell_properties[1]))) + ( + ';1' if cell_properties[2] else '' + ) + ( + ';3' if cell_properties[3] else '' + ) + ( + ';4' if cell_properties[4] else '' + ) + 'm' + ret.append(escapes + segment_text + '\033[0m') + else: + segment_text = segment_text.translate({'{': '{{', '}': '}}'}) + if cell_properties not in new_attrs: + new_attrs[cell_properties] = len(new_attrs) + 1 + props_name = new_attrs[cell_properties] + if props_name in default_props: + ret.append(segment_text) + else: + ret.append('{' + str(props_name) + ':' + segment_text + '}') + return ''.join(ret), new_attrs + + def get_row(self, row, attrs, default_props=(), use_escapes=False): + with self.lock: + return self.get_highlighted_text(( + (key, ''.join((cell.text for cell in subline))) + for key, subline in groupby(( + self.vterm.vtscreen[row, col] + for col in range(self.dim.cols) + ), lambda cell: cell.cell_properties_key) + ), attrs, default_props, use_escapes) + + def get_screen(self, attrs, default_props=(), use_escapes=False): + lines = [] + for row in range(self.dim.rows): + line, attrs = self.get_row(row, attrs, default_props, use_escapes) + lines.append(line) + return '\n'.join(lines), attrs + + +def test_expected_result(p, test, last_attempt, last_attempt_cb, attempts): + debugging_tests = not not os.environ.get('_POWERLINE_DEBUGGING_TESTS') + expected_text, attrs = test['expected_result'] + result = None + while attempts: + if 'row' in test: + row = test['row'] + else: + row = p.dim.rows - 1 + while row >= 0 and not p[row, 0].text: + row -= 1 + if row < 0: + row = 0 + actual_text, all_attrs = p.get_row(row, attrs) + if actual_text == expected_text: + return True + attempts -= 1 + print('Actual result does not match expected for row {0}. Attempts left: {1}.'.format( + row, attempts)) + sleep(2) + print('Result (row {0}):'.format(row)) + print(actual_text) + print('Expected:') + print(expected_text) + print('Attributes:') + for v, k in sorted( + ((v, k) for k, v in all_attrs.items()), + key=(lambda t: '%02u'.format(t[0]) if isinstance(t[0], int) else t[0]), + ): + print('{k!r}: {v!r},'.format(v=v, k=k)) + print('Screen:') + screen, screen_attrs = p.get_screen(attrs, use_escapes=debugging_tests) + print(screen) + print(screen_attrs) + print('_' * 80) + print('Diff:') + print('=' * 80) + print(''.join(( + u(line) for line in ndiff([actual_text + '\n'], [expected_text + '\n'])) + )) + if last_attempt and last_attempt_cb: + last_attempt_cb() + return False + + +ENV_BASE = { + # Reasoning: + # 1. vt* TERMs (used to be vt100 here) make tmux-1.9 use different and + # identical colors for inactive windows. This is not like tmux-1.6: + # foreground color is different from separator color and equal to (0, + # 102, 153) for some reason (separator has correct color). tmux-1.8 is + # fine, so are older versions (though tmux-1.6 and tmux-1.7 do not have + # highlighting for previously active window) and my system tmux-1.9a. + # 2. screen, xterm and some other non-256color terminals both have the same + # issue and make libvterm emit complains like `Unhandled CSI SGR 3231`. + # 3. screen-256color, xterm-256color and other -256color terminals make + # libvterm emit complains about unhandled escapes to stderr. + # 4. `st-256color` does not have any of the above problems, but it may be + # not present on the target system because it is installed with + # x11-terms/st and not with sys-libs/ncurses. + # + # For the given reasons decision was made: to fix tmux-1.9 tests and not + # make libvterm emit any data to stderr st-256color $TERM should be used, up + # until libvterm has its own terminfo database entry (if it ever will). To + # make sure that relevant terminfo entry is present on the target system it + # should be distributed with powerline test package. To make distribution + # not require modifying anything outside of powerline test directory + # TERMINFO variable is set. + # + # This fix propagates to non-tmux vterm tests just in case. + 'TERM': 'st-256color', + # Also $TERMINFO definition in get_env + + 'POWERLINE_CONFIG_PATHS': os.path.abspath('powerline/config_files'), + 'POWERLINE_COMMAND': 'powerline-render', + 'LD_LIBRARY_PATH': os.environ.get('LD_LIBRARY_PATH', ''), + 'PYTHONPATH': os.environ.get('PYTHONPATH', ''), +} + + +def get_env(vterm_path, test_dir, *args, **kwargs): + env = ENV_BASE.copy() + env.update({ + 'TERMINFO': os.path.join(test_dir, 'terminfo'), + 'PATH': vterm_path, + 'SHELL': os.path.join(vterm_path, 'bash'), + }) + env.update(*args, **kwargs) + return env + + +def do_terminal_tests(tests, cmd, dim, args, env, suite, cwd=None, fin_cb=None, + last_attempt_cb=None, attempts=None): + debugging_tests = not not os.environ.get('_POWERLINE_DEBUGGING_TESTS') + default_attempts = 2 if debugging_tests else 3 + if attempts is None: + attempts = default_attempts + lib = os.environ.get('POWERLINE_LIBVTERM') + if not lib: + if os.path.exists('tests/bot-ci/deps/libvterm/libvterm.so'): + lib = 'tests/bot-ci/deps/libvterm/libvterm.so' + else: + lib = 'libvterm.so' + + while attempts: + try: + p = ExpectProcess( + lib=lib, + dim=dim, + cmd=cmd, + args=args, + cwd=cwd, + env=env, + ) + p.start() + p.started_event.wait() + + ret = True + + for i, test in enumerate(tests): + with suite.test(test.get('name', 'test_{0}'.format(i)), + attempts - 1) as ptest: + try: + test_prep = test['prep_cb'] + except KeyError: + pass + else: + test_prep(p) + test_result = test_expected_result( + p, test, attempts == 0, last_attempt_cb, + test.get('attempts', default_attempts) + ) + if not test_result: + ptest.fail('Result does not match expected') + ret = ret and test_result + + if ret: + return ret + finally: + if fin_cb: + fin_cb(p=p, cmd=cmd, env=env) + p.kill() + p.join(10) + assert(not p.isAlive()) + + attempts -= 1 + + return False diff --git a/tests/modules/lib/vterm.py b/tests/modules/lib/vterm.py new file mode 100644 index 0000000..1984e1b --- /dev/null +++ b/tests/modules/lib/vterm.py @@ -0,0 +1,193 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import ctypes + +from collections import namedtuple + +from powerline.lib.unicode import unicode, unichr, tointiter + + +Dimensions = namedtuple('Dimensions', ('rows', 'cols')) + + +class CTypesFunction(object): + def __init__(self, library, name, rettype, args): + self.name = name + self.prototype = ctypes.CFUNCTYPE(rettype, *[ + arg[1] for arg in args + ]) + self.args = args + self.func = self.prototype((name, library), tuple(( + (1, arg[0]) for arg in args + ))) + + def __call__(self, *args, **kwargs): + return self.func(*args, **kwargs) + + def __repr__(self): + return '{cls}(<library>, {name!r}, {rettype!r}, {args!r})'.format( + cls=self.__class__.__name__, + **self.__dict__ + ) + + +class CTypesLibraryFuncsCollection(object): + def __init__(self, lib, **kwargs): + self.lib = lib + library_loader = ctypes.LibraryLoader(ctypes.CDLL) + library = library_loader.LoadLibrary(lib) + self.library = library + for name, args in kwargs.items(): + self.__dict__[name] = CTypesFunction(library, name, *args) + + +class VTermPos_s(ctypes.Structure): + _fields_ = ( + ('row', ctypes.c_int), + ('col', ctypes.c_int), + ) + + +class VTermColor_s(ctypes.Structure): + _fields_ = ( + ('red', ctypes.c_uint8), + ('green', ctypes.c_uint8), + ('blue', ctypes.c_uint8), + ) + + +class VTermScreenCellAttrs_s(ctypes.Structure): + _fields_ = ( + ('bold', ctypes.c_uint, 1), + ('underline', ctypes.c_uint, 2), + ('italic', ctypes.c_uint, 1), + ('blink', ctypes.c_uint, 1), + ('reverse', ctypes.c_uint, 1), + ('strike', ctypes.c_uint, 1), + ('font', ctypes.c_uint, 4), + ('dwl', ctypes.c_uint, 1), + ('dhl', ctypes.c_uint, 2), + ) + + +VTERM_MAX_CHARS_PER_CELL = 6 + + +class VTermScreenCell_s(ctypes.Structure): + _fields_ = ( + ('chars', ctypes.ARRAY(ctypes.c_uint32, VTERM_MAX_CHARS_PER_CELL)), + ('width', ctypes.c_char), + ('attrs', VTermScreenCellAttrs_s), + ('fg', VTermColor_s), + ('bg', VTermColor_s), + ) + + +VTerm_p = ctypes.c_void_p +VTermScreen_p = ctypes.c_void_p + + +def get_functions(lib): + return CTypesLibraryFuncsCollection( + lib, + vterm_new=(VTerm_p, ( + ('rows', ctypes.c_int), + ('cols', ctypes.c_int) + )), + vterm_obtain_screen=(VTermScreen_p, (('vt', VTerm_p),)), + vterm_set_size=(None, ( + ('vt', VTerm_p), + ('rows', ctypes.c_int), + ('cols', ctypes.c_int) + )), + vterm_screen_reset=(None, ( + ('screen', VTermScreen_p), + ('hard', ctypes.c_int) + )), + vterm_input_write=(ctypes.c_size_t, ( + ('vt', VTerm_p), + ('bytes', ctypes.POINTER(ctypes.c_char)), + ('size', ctypes.c_size_t), + )), + vterm_screen_get_cell=(ctypes.c_int, ( + ('screen', VTermScreen_p), + ('pos', VTermPos_s), + ('cell', ctypes.POINTER(VTermScreenCell_s)) + )), + vterm_free=(None, (('vt', VTerm_p),)), + vterm_set_utf8=(None, (('vt', VTerm_p), ('is_utf8', ctypes.c_int))), + ) + + +class VTermColor(object): + __slots__ = ('red', 'green', 'blue') + + def __init__(self, color): + self.red = color.red + self.green = color.green + self.blue = color.blue + + @property + def color_key(self): + return (self.red, self.green, self.blue) + + +class VTermScreenCell(object): + def __init__(self, vtsc): + for field in VTermScreenCellAttrs_s._fields_: + field_name = field[0] + setattr(self, field_name, getattr(vtsc.attrs, field_name)) + self.text = ''.join(( + unichr(vtsc.chars[i]) for i in range(VTERM_MAX_CHARS_PER_CELL) + )).rstrip('\x00') + self.width = next(tointiter(vtsc.width)) + self.fg = VTermColor(vtsc.fg) + self.bg = VTermColor(vtsc.bg) + self.cell_properties_key = ( + self.fg.color_key, + self.bg.color_key, + self.bold, + self.underline, + self.italic, + ) + + +class VTermScreen(object): + def __init__(self, functions, screen): + self.functions = functions + self.screen = screen + + def __getitem__(self, position): + pos = VTermPos_s(*position) + cell = VTermScreenCell_s() + ret = self.functions.vterm_screen_get_cell(self.screen, pos, cell) + if ret != 1: + raise ValueError('vterm_screen_get_cell returned {0}'.format(ret)) + return VTermScreenCell(cell) + + def reset(self, hard): + self.functions.vterm_screen_reset(self.screen, int(bool(hard))) + + +class VTerm(object): + def __init__(self, lib, dim): + self.functions = get_functions(lib) + self.vt = self.functions.vterm_new(dim.rows, dim.cols) + self.functions.vterm_set_utf8(self.vt, 1) + self.vtscreen = VTermScreen(self.functions, self.functions.vterm_obtain_screen(self.vt)) + self.vtscreen.reset(True) + + def push(self, data): + if isinstance(data, unicode): + data = data.encode('utf-8') + return self.functions.vterm_input_write(self.vt, data, len(data)) + + def resize(self, dim): + self.functions.vterm_set_size(self.vt, dim.rows, dim.cols) + + def __del__(self): + try: + self.functions.vterm_free(self.vt) + except AttributeError: + pass diff --git a/tests/modules/matchers.py b/tests/modules/matchers.py new file mode 100644 index 0000000..e905de3 --- /dev/null +++ b/tests/modules/matchers.py @@ -0,0 +1,6 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + + +def always_true(matcher_info): + return True diff --git a/tests/modules/vim.py b/tests/modules/vim.py new file mode 100644 index 0000000..3f6882c --- /dev/null +++ b/tests/modules/vim.py @@ -0,0 +1,927 @@ +# vim:fileencoding=utf-8:noet +_log = [] +vars = {} +vvars = {'version': 703} +_tabpage = 0 +_mode = 'n' +_buf_purge_events = set() +options = { + 'paste': 0, + 'ambiwidth': 'single', + 'columns': 80, + 'encoding': 'utf-8', +} +_last_bufnr = 0 +_highlights = {} +from collections import defaultdict as _defaultdict +_environ = _defaultdict(lambda: '') +del _defaultdict + + +_thread_id = None + + +def _set_thread_id(): + global _thread_id + from threading import current_thread + _thread_id = current_thread().ident + + +# Assuming import is done from the main thread +_set_thread_id() + + +def _print_log(): + for item in _log: + print (item) + _log[:] = () + + +def _vim(func): + from functools import wraps + from threading import current_thread + + @wraps(func) + def f(*args, **kwargs): + global _thread_id + if _thread_id != current_thread().ident: + raise RuntimeError('Accessing vim from separate threads is not allowed') + _log.append((func.__name__, args)) + return func(*args, **kwargs) + + return f + + +def _unicode(func): + from functools import wraps + import sys + + if sys.version_info < (3,): + return func + + @wraps(func) + def f(*args, **kwargs): + from powerline.lib.unicode import u + ret = func(*args, **kwargs) + if isinstance(ret, bytes): + ret = u(ret) + return ret + + return f + + +class _Buffers(object): + @_vim + def __init__(self): + self.d = {} + + @_vim + def __len__(self): + return len(self.d) + + @_vim + def __getitem__(self, item): + return self.d[item] + + @_vim + def __setitem__(self, item, value): + self.d[item] = value + + @_vim + def __iter__(self): + return iter(self.d.values()) + + @_vim + def __contains__(self, item): + return item in self.d + + @_vim + def _keys(self): + return self.d.keys() + + @_vim + def _pop(self, *args, **kwargs): + return self.d.pop(*args, **kwargs) + + +buffers = _Buffers() + + +class _ObjList(object): + @_vim + def __init__(self, objtype): + self.l = [] + self.objtype = objtype + + @_vim + def __getitem__(self, item): + return self.l[item - int(item > 0)] + + @_vim + def __len__(self): + return len(self.l) + + @_vim + def __iter__(self): + return iter(self.l) + + @_vim + def _pop(self, idx): + obj = self.l.pop(idx - 1) + for moved_obj in self.l[idx - 1:]: + moved_obj.number -= 1 + return obj + + @_vim + def _append(self, *args, **kwargs): + return self.l.append(*args, **kwargs) + + @_vim + def _new(self, *args, **kwargs): + number = len(self) + 1 + new_obj = self.objtype(number, *args, **kwargs) + self._append(new_obj) + return new_obj + + +def _construct_result(r): + import sys + if sys.version_info < (3,): + return r + else: + if isinstance(r, str): + return r.encode('utf-8') + elif isinstance(r, list): + return [_construct_result(i) for i in r] + elif isinstance(r, dict): + return dict(( + (_construct_result(k), _construct_result(v)) + for k, v in r.items() + )) + return r + + +def _str_func(func): + from functools import wraps + + @wraps(func) + def f(*args, **kwargs): + return _construct_result(func(*args, **kwargs)) + return f + + +def _log_print(): + import sys + for entry in _log: + sys.stdout.write(repr(entry) + '\n') + + +_current_group = None +_on_wipeout = [] + + +@_vim +def command(cmd): + global _current_group + cmd = cmd.lstrip() + if cmd.startswith('let g:'): + import re + varname, value = re.compile(r'^let g:(\w+)\s*=\s*(.*)').match(cmd).groups() + vars[varname] = value + elif cmd.startswith('hi '): + sp = cmd.split() + _highlights[sp[1]] = sp[2:] + elif cmd.startswith('augroup'): + augroup = cmd.partition(' ')[2] + if augroup.upper() == 'END': + _current_group = None + else: + _current_group = augroup + elif cmd.startswith('autocmd'): + rest = cmd.partition(' ')[2] + auevent, rest = rest.partition(' ')[::2] + pattern, aucmd = rest.partition(' ')[::2] + if auevent != 'BufWipeout' or pattern != '*': + raise NotImplementedError + import sys + if sys.version_info < (3,): + if not aucmd.startswith(':python '): + raise NotImplementedError + else: + if not aucmd.startswith(':python3 '): + raise NotImplementedError + _on_wipeout.append(aucmd.partition(' ')[2]) + elif cmd.startswith('set '): + if cmd.startswith('set statusline='): + options['statusline'] = cmd[len('set statusline='):] + elif cmd.startswith('set tabline='): + options['tabline'] = cmd[len('set tabline='):] + else: + raise NotImplementedError(cmd) + else: + raise NotImplementedError(cmd) + + +@_vim +@_unicode +def eval(expr): + if expr.startswith('g:'): + return vars[expr[2:]] + elif expr.startswith('v:'): + return vvars[expr[2:]] + elif expr.startswith('&'): + return options[expr[1:]] + elif expr.startswith('$'): + return _environ[expr[1:]] + elif expr.startswith('PowerlineRegisterCachePurgerEvent'): + _buf_purge_events.add(expr[expr.find('"') + 1:expr.rfind('"') - 1]) + return '0' + elif expr.startswith('exists('): + return '0' + elif expr.startswith('getwinvar('): + import re + match = re.match(r'^getwinvar\((\d+), "(\w+)"\)$', expr) + if not match: + raise NotImplementedError(expr) + winnr = int(match.group(1)) + varname = match.group(2) + return _emul_getwinvar(winnr, varname) + elif expr.startswith('has_key('): + import re + match = re.match(r'^has_key\(getwinvar\((\d+), ""\), "(\w+)"\)$', expr) + if match: + winnr = int(match.group(1)) + varname = match.group(2) + return 0 + (varname in current.tabpage.windows[winnr].vars) + else: + match = re.match(r'^has_key\(gettabwinvar\((\d+), (\d+), ""\), "(\w+)"\)$', expr) + if not match: + raise NotImplementedError(expr) + tabnr = int(match.group(1)) + winnr = int(match.group(2)) + varname = match.group(3) + return 0 + (varname in tabpages[tabnr].windows[winnr].vars) + elif expr == 'getbufvar("%", "NERDTreeRoot").path.str()': + import os + assert os.path.basename(current.buffer.name).startswith('NERD_tree_') + return '/usr/include' + elif expr.startswith('getbufvar('): + import re + match = re.match(r'^getbufvar\((\d+), ["\'](.+)["\']\)$', expr) + if not match: + raise NotImplementedError(expr) + bufnr = int(match.group(1)) + varname = match.group(2) + return _emul_getbufvar(bufnr, varname) + elif expr == 'tabpagenr()': + return current.tabpage.number + elif expr == 'tabpagenr("$")': + return len(tabpages) + elif expr.startswith('tabpagewinnr('): + tabnr = int(expr[len('tabpagewinnr('):-1]) + return tabpages[tabnr].window.number + elif expr.startswith('tabpagebuflist('): + import re + match = re.match(r'tabpagebuflist\((\d+)\)\[(\d+)\]', expr) + tabnr = int(match.group(1)) + winnr = int(match.group(2)) + 1 + return tabpages[tabnr].windows[winnr].buffer.number + elif expr.startswith('gettabwinvar('): + import re + match = re.match(r'gettabwinvar\((\d+), (\d+), "(\w+)"\)', expr) + tabnr = int(match.group(1)) + winnr = int(match.group(2)) + varname = match.group(3) + return tabpages[tabnr].windows[winnr].vars[varname] + elif expr.startswith('type(function('): + import re + match = re.match(r'^type\(function\("([^"]+)"\)\) == 2$', expr) + if not match: + raise NotImplementedError(expr) + return 0 + raise NotImplementedError(expr) + + +@_vim +def bindeval(expr): + if expr == 'g:': + return vars + elif expr == '{}': + return {} + elif expr == '[]': + return [] + import re + match = re.compile(r'^function\("([^"\\]+)"\)$').match(expr) + if match: + return globals()['_emul_' + match.group(1)] + else: + raise NotImplementedError + + +@_vim +@_str_func +def _emul_mode(*args): + if args and args[0]: + return _mode + else: + return _mode[0] + + +@_vim +@_str_func +def _emul_getbufvar(bufnr, varname): + import re + if varname[0] == '&': + if bufnr == '%': + bufnr = current.buffer.number + if bufnr not in buffers: + return '' + try: + return buffers[bufnr].options[varname[1:]] + except KeyError: + try: + return options[varname[1:]] + except KeyError: + return '' + elif re.match('^[a-zA-Z_]+$', varname): + if bufnr == '%': + bufnr = current.buffer.number + if bufnr not in buffers: + return '' + return buffers[bufnr].vars[varname] + raise NotImplementedError + + +@_vim +@_str_func +def _emul_getwinvar(winnr, varname): + return current.tabpage.windows[winnr].vars.get(varname, '') + + +@_vim +def _emul_setwinvar(winnr, varname, value): + current.tabpage.windows[winnr].vars[varname] = value + + +@_vim +def _emul_virtcol(expr): + if expr == '.': + return current.window.cursor[1] + 1 + if isinstance(expr, list) and len(expr) == 3: + return expr[-2] + expr[-1] + raise NotImplementedError + + +_v_pos = None + + +@_vim +def _emul_getpos(expr): + if expr == '.': + return [0, current.window.cursor[0] + 1, current.window.cursor[1] + 1, 0] + if expr == 'v': + return _v_pos or [0, current.window.cursor[0] + 1, current.window.cursor[1] + 1, 0] + raise NotImplementedError + + +@_vim +@_str_func +def _emul_fnamemodify(path, modstring): + import os + _modifiers = { + '~': lambda path: path.replace(os.environ['HOME'].encode('utf-8'), b'~') if path.startswith(os.environ['HOME'].encode('utf-8')) else path, + '.': lambda path: (lambda tpath: path if tpath[:3] == b'..' + os.sep.encode() else tpath)(os.path.relpath(path)), + 't': lambda path: os.path.basename(path), + 'h': lambda path: os.path.dirname(path), + } + + for mods in modstring.split(':')[1:]: + path = _modifiers[mods](path) + return path + + +@_vim +@_str_func +def _emul_expand(expr): + global _abuf + if expr == '<abuf>': + return _abuf or current.buffer.number + raise NotImplementedError + + +@_vim +def _emul_bufnr(expr): + if expr == '$': + return _last_bufnr + raise NotImplementedError + + +@_vim +def _emul_exists(ident): + if ident.startswith('g:'): + return ident[2:] in vars + elif ident.startswith(':'): + return 0 + raise NotImplementedError + + +@_vim +def _emul_line2byte(line): + buflines = current.buffer._buf_lines + if line == len(buflines) + 1: + return sum((len(s) for s in buflines)) + 1 + raise NotImplementedError + + +@_vim +def _emul_line(expr): + cursorline = current.window.cursor[0] + 1 + numlines = len(current.buffer._buf_lines) + if expr == 'w0': + return max(cursorline - 5, 1) + if expr == 'w$': + return min(cursorline + 5, numlines) + raise NotImplementedError + + +@_vim +@_str_func +def _emul_strtrans(s): + # FIXME Do more replaces + return s.replace(b'\xFF', b'<ff>') + + +@_vim +@_str_func +def _emul_bufname(bufnr): + try: + return buffers[bufnr]._name or b'' + except KeyError: + return b'' + + +_window_id = 0 + + +class _Window(object): + def __init__(self, number, buffer=None, cursor=(1, 0), width=80): + global _window_id + self.cursor = cursor + self.width = width + self.number = number + if buffer: + if type(buffer) is _Buffer: + self.buffer = buffer + else: + self.buffer = _Buffer(**buffer) + else: + self.buffer = _Buffer() + _window_id += 1 + self._window_id = _window_id + self.options = {} + self.vars = { + 'powerline_window_id': self._window_id, + } + + def __repr__(self): + return '<window ' + str(self.number - 1) + '>' + + +class _Tabpage(object): + def __init__(self, number): + self.windows = _ObjList(_Window) + self.number = number + + def _new_window(self, **kwargs): + self.window = self.windows._new(**kwargs) + return self.window + + def _close_window(self, winnr, open_window=True): + curwinnr = self.window.number + win = self.windows._pop(winnr) + if self.windows and winnr == curwinnr: + self.window = self.windows[-1] + elif open_window: + current.tabpage._new_window() + return win + + def _close(self): + global _tabpage + while self.windows: + self._close_window(1, False) + tabpages._pop(self.number) + _tabpage = len(tabpages) + + +tabpages = _ObjList(_Tabpage) + + +_abuf = None + + +class _Buffer(object): + def __init__(self, name=None): + global _last_bufnr + _last_bufnr += 1 + bufnr = _last_bufnr + self.number = bufnr + # FIXME Use unicode() for python-3 + self.name = name + self.vars = {'changedtick': 1} + self.options = { + 'modified': 0, + 'readonly': 0, + 'fileformat': 'unix', + 'filetype': '', + 'buftype': '', + 'fileencoding': 'utf-8', + 'textwidth': 80, + } + self._buf_lines = [''] + self._undostate = [self._buf_lines[:]] + self._undo_written = len(self._undostate) + buffers[bufnr] = self + + @property + def name(self): + import sys + if sys.version_info < (3,): + return self._name + else: + return str(self._name, 'utf-8') if self._name else None + + @name.setter + def name(self, name): + if name is None: + self._name = None + else: + import os + if type(name) is not bytes: + name = name.encode('utf-8') + if b':/' in name: + self._name = name + else: + self._name = os.path.abspath(name) + + def __getitem__(self, line): + return self._buf_lines[line] + + def __setitem__(self, line, value): + self.options['modified'] = 1 + self.vars['changedtick'] += 1 + self._buf_lines[line] = value + from copy import copy + self._undostate.append(copy(self._buf_lines)) + + def __setslice__(self, *args): + self.options['modified'] = 1 + self.vars['changedtick'] += 1 + self._buf_lines.__setslice__(*args) + from copy import copy + self._undostate.append(copy(self._buf_lines)) + + def __getslice__(self, *args): + return self._buf_lines.__getslice__(*args) + + def __len__(self): + return len(self._buf_lines) + + def __repr__(self): + return '<buffer ' + str(self.name) + '>' + + def __del__(self): + global _abuf + bufnr = self.number + try: + import __main__ + except ImportError: + pass + except RuntimeError: + # Module may have already been garbage-collected + pass + else: + if _on_wipeout: + _abuf = bufnr + try: + for event in _on_wipeout: + exec(event, __main__.__dict__) + finally: + _abuf = None + + +class _Current(object): + @property + def buffer(self): + return self.window.buffer + + @property + def window(self): + return self.tabpage.window + + @property + def tabpage(self): + return tabpages[_tabpage - 1] + + +current = _Current() + + +_dict = None + + +@_vim +def _init(): + global _dict + + if _dict: + return _dict + + _dict = {} + for varname, value in globals().items(): + if varname[0] != '_': + _dict[varname] = value + _tabnew() + return _dict + + +@_vim +def _get_segment_info(): + mode_translations = { + chr(ord('V') - 0x40): '^V', + chr(ord('S') - 0x40): '^S', + } + mode = _mode + mode = mode_translations.get(mode, mode) + window = current.window + buffer = current.buffer + tabpage = current.tabpage + return { + 'window': window, + 'winnr': window.number, + 'buffer': buffer, + 'bufnr': buffer.number, + 'tabpage': tabpage, + 'tabnr': tabpage.number, + 'window_id': window._window_id, + 'mode': mode, + 'encoding': options['encoding'], + } + + +@_vim +def _launch_event(event): + pass + + +@_vim +def _start_mode(mode): + global _mode + if mode == 'i': + _launch_event('InsertEnter') + elif _mode == 'i': + _launch_event('InsertLeave') + _mode = mode + + +@_vim +def _undo(): + if len(current.buffer._undostate) == 1: + return + buffer = current.buffer + buffer._undostate.pop(-1) + buffer._buf_lines = buffer._undostate[-1] + if buffer._undo_written == len(buffer._undostate): + buffer.options['modified'] = 0 + + +@_vim +def _edit(name=None): + if current.buffer.name is None: + buffer = current.buffer + buffer.name = name + else: + buffer = _Buffer(name) + current.window.buffer = buffer + + +@_vim +def _tabnew(name=None): + global windows + global _tabpage + tabpage = tabpages._new() + windows = tabpage.windows + _tabpage = len(tabpages) + _new(name) + return tabpage + + +@_vim +def _new(name=None): + current.tabpage._new_window(buffer={'name': name}) + + +@_vim +def _split(): + current.tabpage._new_window(buffer=current.buffer) + + +@_vim +def _close(winnr, wipe=True): + win = current.tabpage._close_window(winnr) + if wipe: + for w in current.tabpage.windows: + if w.buffer.number == win.buffer.number: + break + else: + _bw(win.buffer.number) + + +@_vim +def _bw(bufnr=None): + bufnr = bufnr or current.buffer.number + winnr = 1 + for win in current.tabpage.windows: + if win.buffer.number == bufnr: + _close(winnr, wipe=False) + winnr += 1 + buffers._pop(bufnr) + if not buffers: + _Buffer() + _b(max(buffers._keys())) + + +@_vim +def _b(bufnr): + current.window.buffer = buffers[bufnr] + + +@_vim +def _set_cursor(line, col): + current.window.cursor = (line, col) + if _mode == 'n': + _launch_event('CursorMoved') + elif _mode == 'i': + _launch_event('CursorMovedI') + + +@_vim +def _get_buffer(): + return current.buffer + + +@_vim +def _set_bufoption(option, value, bufnr=None): + buffers[bufnr or current.buffer.number].options[option] = value + if option == 'filetype': + _launch_event('FileType') + + +class _WithNewBuffer(object): + def __init__(self, func, *args, **kwargs): + self.call = lambda: func(*args, **kwargs) + + def __enter__(self): + self.call() + self.bufnr = current.buffer.number + return _get_segment_info() + + def __exit__(self, *args): + _bw(self.bufnr) + + +@_vim +def _set_dict(d, new, setfunc=None): + if not setfunc: + def setfunc(k, v): + d[k] = v + + old = {} + na = [] + for k, v in new.items(): + try: + old[k] = d[k] + except KeyError: + na.append(k) + setfunc(k, v) + return old, na + + +class _WithBufOption(object): + def __init__(self, **new): + self.new = new + + def __enter__(self): + self.buffer = current.buffer + self.old = _set_dict(self.buffer.options, self.new, _set_bufoption)[0] + + def __exit__(self, *args): + self.buffer.options.update(self.old) + + +class _WithMode(object): + def __init__(self, new): + self.new = new + + def __enter__(self): + self.old = _mode + _start_mode(self.new) + return _get_segment_info() + + def __exit__(self, *args): + _start_mode(self.old) + + +class _WithDict(object): + def __init__(self, d, **new): + self.new = new + self.d = d + + def __enter__(self): + self.old, self.na = _set_dict(self.d, self.new) + + def __exit__(self, *args): + self.d.update(self.old) + for k in self.na: + self.d.pop(k) + + +class _WithSplit(object): + def __enter__(self): + _split() + + def __exit__(self, *args): + _close(2, wipe=False) + + +class _WithBufName(object): + def __init__(self, new): + self.new = new + + def __enter__(self): + import os + buffer = current.buffer + self.buffer = buffer + self.old = buffer.name + buffer.name = self.new + + def __exit__(self, *args): + self.buffer.name = self.old + + +class _WithNewTabPage(object): + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + + def __enter__(self): + self.tab = _tabnew(*self.args, **self.kwargs) + + def __exit__(self, *args): + self.tab._close() + + +class _WithGlobal(object): + def __init__(self, **kwargs): + self.kwargs = kwargs + + def __enter__(self): + self.empty = object() + self.old = dict(((key, globals().get(key, self.empty)) for key in self.kwargs)) + globals().update(self.kwargs) + + def __exit__(self, *args): + for k, v in self.old.items(): + if v is self.empty: + globals().pop(k, None) + else: + globals()[k] = v + + +@_vim +def _with(key, *args, **kwargs): + if key == 'buffer': + return _WithNewBuffer(_edit, *args, **kwargs) + elif key == 'bufname': + return _WithBufName(*args, **kwargs) + elif key == 'mode': + return _WithMode(*args, **kwargs) + elif key == 'bufoptions': + return _WithBufOption(**kwargs) + elif key == 'options': + return _WithDict(options, **kwargs) + elif key == 'globals': + return _WithDict(vars, **kwargs) + elif key == 'wvars': + return _WithDict(current.window.vars, **kwargs) + elif key == 'environ': + return _WithDict(_environ, **kwargs) + elif key == 'split': + return _WithSplit() + elif key == 'tabpage': + return _WithNewTabPage(*args, **kwargs) + elif key == 'vpos': + return _WithGlobal(_v_pos=[0, kwargs['line'], kwargs['col'], kwargs['off']]) + + +class error(Exception): + pass |