summaryrefslogtreecommitdiffstats
path: root/tests/modules
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-06 02:04:06 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-06 02:04:06 +0000
commita8637fe80c24fb04bf6cc8ae8459877535f1341b (patch)
tree5d263b4543e10940f5e9a79a8fe981c5a3414bd7 /tests/modules
parentInitial commit. (diff)
downloadpowerline-a8637fe80c24fb04bf6cc8ae8459877535f1341b.tar.xz
powerline-a8637fe80c24fb04bf6cc8ae8459877535f1341b.zip
Adding upstream version 2.7.upstream/2.7upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tests/modules')
-rw-r--r--tests/modules/__init__.py94
-rw-r--r--tests/modules/lib/__init__.py183
-rw-r--r--tests/modules/lib/config_mock.py230
-rw-r--r--tests/modules/lib/fsconfig.py83
-rw-r--r--tests/modules/lib/terminal.py307
-rw-r--r--tests/modules/lib/vterm.py193
-rw-r--r--tests/modules/matchers.py6
-rw-r--r--tests/modules/vim.py927
8 files changed, 2023 insertions, 0 deletions
diff --git a/tests/modules/__init__.py b/tests/modules/__init__.py
new file mode 100644
index 0000000..12aae20
--- /dev/null
+++ b/tests/modules/__init__.py
@@ -0,0 +1,94 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import sys
+import os
+
+if sys.version_info < (2, 7):
+ from unittest2 import TestCase as _TestCase # NOQA
+ from unittest2 import main as _main # NOQA
+ from unittest2.case import SkipTest # NOQA
+else:
+ from unittest import TestCase as _TestCase # NOQA
+ from unittest import main as _main # NOQA
+ from unittest.case import SkipTest # NOQA
+
+from tests.modules.lib import PowerlineSingleTest
+
+
+class PowerlineDummyTest(object):
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ pass
+
+ def fail(self, *args, **kwargs):
+ pass
+
+ def exception(self, *args, **kwargs):
+ pass
+
+
+class PowerlineTestSuite(object):
+ def __init__(self, name):
+ self.name = name
+
+ def __enter__(self):
+ self.saved_current_suite = os.environ['POWERLINE_CURRENT_SUITE']
+ os.environ['POWERLINE_CURRENT_SUITE'] = (
+ self.saved_current_suite + '/' + self.name)
+ self.suite = self.saved_current_suite + '/' + self.name
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ if exc_type is not None:
+ self.exception(
+ 'suite_noexcept',
+ 'Exception while running test suite: {0!r}'.format(exc_value),
+ )
+ os.environ['POWERLINE_CURRENT_SUITE'] = self.saved_current_suite
+
+ def record_test_failure(self, fail_char, test_name, message, allow_failure=False):
+ if allow_failure:
+ fail_char = 'A' + fail_char
+ full_msg = '{fail_char} {suite}|{test_name} :: {message}'.format(
+ fail_char=fail_char,
+ suite=self.suite,
+ test_name=test_name,
+ message=message,
+ )
+ with open(os.environ['FAILURES_FILE'], 'a') as ffd:
+ ffd.write(full_msg + '\n')
+ return False
+
+ def exception(self, test_name, message, allow_failure=False):
+ return self.record_test_failure('E', test_name, message, allow_failure)
+
+ def fail(self, test_name, message, allow_failure=False):
+ return self.record_test_failure('F', test_name, message, allow_failure)
+
+ def test(self, name, attempts_left=0):
+ if not attempts_left:
+ return PowerlineSingleTest(self, name)
+ else:
+ return PowerlineDummyTest()
+
+ def subsuite(self, name):
+ return PowerlineTestSuite(name)
+
+
+suite = None
+
+
+def main(*args, **kwargs):
+ global suite
+ suite = PowerlineTestSuite(sys.argv[0])
+ _main(*args, **kwargs)
+
+
+class TestCase(_TestCase):
+ def fail(self, msg=None):
+ suite.fail(self.__class__.__name__,
+ msg or 'Test failed without message')
+ super(TestCase, self).fail(*args, **kwargs)
diff --git a/tests/modules/lib/__init__.py b/tests/modules/lib/__init__.py
new file mode 100644
index 0000000..d45ccae
--- /dev/null
+++ b/tests/modules/lib/__init__.py
@@ -0,0 +1,183 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import imp
+import sys
+
+
+class Pl(object):
+ def __init__(self):
+ self.exceptions = []
+ self.errors = []
+ self.warns = []
+ self.debugs = []
+ self.infos = []
+ self.prefix = None
+ self.use_daemon_threads = True
+
+ for meth in ('error', 'warn', 'debug', 'exception', 'info'):
+ exec((
+ 'def {0}(self, msg, *args, **kwargs):\n'
+ ' self.{0}s.append((kwargs.get("prefix") or self.prefix, msg, args, kwargs))\n'
+ ).format(meth))
+
+ def __nonzero__(self):
+ return bool(self.exceptions or self.errors or self.warns)
+
+ __bool__ = __nonzero__
+
+
+class Args(object):
+ theme_override = {}
+ config_override = {}
+ config_path = None
+ ext = ['shell']
+ renderer_module = None
+
+ def __init__(self, **kwargs):
+ self.__dict__.update(kwargs)
+
+
+def urllib_read(query_url):
+ if query_url.startswith('http://ipv'):
+ if query_url.startswith('http://ipv4.icanhazip.com'):
+ return '127.0.0.1'
+ elif query_url.startswith('http://ipv4.icanhazip.com'):
+ return '2001:4801:7818:6:abc5:ba2c:ff10:275f'
+ elif query_url.startswith('http://geoip.nekudo.com/api/'):
+ return '{"city":"Meppen","country":{"name":"Germany", "code":"DE"},"location":{"accuracy_radius":100,"latitude":52.6833,"longitude":7.3167,"time_zone":"Europe\/Berlin"},"ip":"82.145.55.16"}'
+ elif query_url.startswith('http://query.yahooapis.com/v1/public/'):
+ if 'Meppen' in query_url:
+ return r'{"query":{"count":1,"created":"2016-05-13T19:43:18Z","lang":"en-US","results":{"channel":{"units":{"distance":"mi","pressure":"in","speed":"mph","temperature":"C"},"title":"Yahoo! Weather - Meppen, NI, DE","link":"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-674836/","description":"Yahoo! Weather for Meppen, NI, DE","language":"en-us","lastBuildDate":"Fri, 13 May 2016 09:43 PM CEST","ttl":"60","location":{"city":"Meppen","country":"Germany","region":" NI"},"wind":{"chill":"55","direction":"350","speed":"25"},"atmosphere":{"humidity":"57","pressure":"1004.0","rising":"0","visibility":"16.1"},"astronomy":{"sunrise":"5:35 am","sunset":"9:21 pm"},"image":{"title":"Yahoo! Weather","width":"142","height":"18","link":"http://weather.yahoo.com","url":"http://l.yimg.com/a/i/brand/purplelogo//uh/us/news-wea.gif"},"item":{"title":"Conditions for Meppen, NI, DE at 08:00 PM CEST","lat":"52.68993","long":"7.29115","link":"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-674836/","pubDate":"Fri, 13 May 2016 08:00 PM CEST","condition":{"code":"23","date":"Fri, 13 May 2016 08:00 PM CEST","temp":"14","text":"Breezy"},"forecast":[{"code":"30","date":"13 May 2016","day":"Fri","high":"71","low":"48","text":"Partly Cloudy"},{"code":"28","date":"14 May 2016","day":"Sat","high":"54","low":"44","text":"Mostly Cloudy"},{"code":"11","date":"15 May 2016","day":"Sun","high":"55","low":"43","text":"Showers"},{"code":"28","date":"16 May 2016","day":"Mon","high":"54","low":"42","text":"Mostly Cloudy"},{"code":"28","date":"17 May 2016","day":"Tue","high":"57","low":"43","text":"Mostly Cloudy"},{"code":"12","date":"18 May 2016","day":"Wed","high":"62","low":"45","text":"Rain"},{"code":"28","date":"19 May 2016","day":"Thu","high":"63","low":"48","text":"Mostly Cloudy"},{"code":"28","date":"20 May 2016","day":"Fri","high":"67","low":"50","text":"Mostly Cloudy"},{"code":"30","date":"21 May 2016","day":"Sat","high":"71","low":"50","text":"Partly Cloudy"},{"code":"30","date":"22 May 2016","day":"Sun","high":"74","low":"54","text":"Partly Cloudy"}],"description":"<![CDATA[<img src=\"http://l.yimg.com/a/i/us/we/52/23.gif\"/>\n<BR />\n<b>Current Conditions:</b>\n<BR />Breezy\n<BR />\n<BR />\n<b>Forecast:</b>\n<BR /> Fri - Partly Cloudy. High: 71Low: 48\n<BR /> Sat - Mostly Cloudy. High: 54Low: 44\n<BR /> Sun - Showers. High: 55Low: 43\n<BR /> Mon - Mostly Cloudy. High: 54Low: 42\n<BR /> Tue - Mostly Cloudy. High: 57Low: 43\n<BR />\n<BR />\n<a href=\"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-674836/\">Full Forecast at Yahoo! Weather</a>\n<BR />\n<BR />\n(provided by <a href=\"http://www.weather.com\" >The Weather Channel</a>)\n<BR />\n]]>","guid":{"isPermaLink":"false"}}}}}}'
+ elif 'Moscow' in query_url:
+ return r'{"query":{"count":1,"created":"2016-05-13T19:47:01Z","lang":"en-US","results":{"channel":{"units":{"distance":"mi","pressure":"in","speed":"mph","temperature":"C"},"title":"Yahoo! Weather - Moscow, Moscow Federal City, RU","link":"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-2122265/","description":"Yahoo! Weather for Moscow, Moscow Federal City, RU","language":"en-us","lastBuildDate":"Fri, 13 May 2016 10:47 PM MSK","ttl":"60","location":{"city":"Moscow","country":"Russia","region":" Moscow Federal City"},"wind":{"chill":"45","direction":"80","speed":"11"},"atmosphere":{"humidity":"52","pressure":"993.0","rising":"0","visibility":"16.1"},"astronomy":{"sunrise":"4:19 am","sunset":"8:34 pm"},"image":{"title":"Yahoo! Weather","width":"142","height":"18","link":"http://weather.yahoo.com","url":"http://l.yimg.com/a/i/brand/purplelogo//uh/us/news-wea.gif"},"item":{"title":"Conditions for Moscow, Moscow Federal City, RU at 09:00 PM MSK","lat":"55.741638","long":"37.605061","link":"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-2122265/","pubDate":"Fri, 13 May 2016 09:00 PM MSK","condition":{"code":"33","date":"Fri, 13 May 2016 09:00 PM MSK","temp":"9","text":"Mostly Clear"},"forecast":[{"code":"30","date":"13 May 2016","day":"Fri","high":"62","low":"41","text":"Partly Cloudy"},{"code":"30","date":"14 May 2016","day":"Sat","high":"64","low":"43","text":"Partly Cloudy"},{"code":"30","date":"15 May 2016","day":"Sun","high":"63","low":"44","text":"Partly Cloudy"},{"code":"12","date":"16 May 2016","day":"Mon","high":"60","low":"47","text":"Rain"},{"code":"12","date":"17 May 2016","day":"Tue","high":"64","low":"48","text":"Rain"},{"code":"28","date":"18 May 2016","day":"Wed","high":"67","low":"48","text":"Mostly Cloudy"},{"code":"12","date":"19 May 2016","day":"Thu","high":"68","low":"49","text":"Rain"},{"code":"39","date":"20 May 2016","day":"Fri","high":"66","low":"50","text":"Scattered Showers"},{"code":"39","date":"21 May 2016","day":"Sat","high":"69","low":"49","text":"Scattered Showers"},{"code":"30","date":"22 May 2016","day":"Sun","high":"73","low":"50","text":"Partly Cloudy"}],"description":"<![CDATA[<img src=\"http://l.yimg.com/a/i/us/we/52/33.gif\"/>\n<BR />\n<b>Current Conditions:</b>\n<BR />Mostly Clear\n<BR />\n<BR />\n<b>Forecast:</b>\n<BR /> Fri - Partly Cloudy. High: 62Low: 41\n<BR /> Sat - Partly Cloudy. High: 64Low: 43\n<BR /> Sun - Partly Cloudy. High: 63Low: 44\n<BR /> Mon - Rain. High: 60Low: 47\n<BR /> Tue - Rain. High: 64Low: 48\n<BR />\n<BR />\n<a href=\"http://us.rd.yahoo.com/dailynews/rss/weather/Country__Country/*https://weather.yahoo.com/country/state/city-2122265/\">Full Forecast at Yahoo! Weather</a>\n<BR />\n<BR />\n(provided by <a href=\"http://www.weather.com\" >The Weather Channel</a>)\n<BR />\n]]>","guid":{"isPermaLink":"false"}}}}}}'
+ else:
+ raise NotImplementedError
+
+
+class Process(object):
+ def __init__(self, output, err):
+ self.output = output
+ self.err = err
+
+ def communicate(self):
+ return self.output, self.err
+
+
+class ModuleReplace(object):
+ def __init__(self, name, new):
+ self.name = name
+ self.new = new
+
+ def __enter__(self):
+ self.old = sys.modules.get(self.name)
+ if not self.old:
+ try:
+ self.old = __import__(self.name)
+ except ImportError:
+ pass
+ sys.modules[self.name] = self.new
+
+ def __exit__(self, *args):
+ if self.old:
+ sys.modules[self.name] = self.old
+ else:
+ sys.modules.pop(self.name)
+
+
+def replace_module(name, new=None, **kwargs):
+ if not new:
+ new = new_module(name, **kwargs)
+ return ModuleReplace(name, new)
+
+
+def new_module(name, **kwargs):
+ module = imp.new_module(name)
+ for k, v in kwargs.items():
+ setattr(module, k, v)
+ return module
+
+
+class AttrReplace(object):
+ def __init__(self, obj, *args):
+ self.obj = obj
+ self.attrs = args[::2]
+ self.new = args[1::2]
+
+ def __enter__(self):
+ self.old = {}
+ for i, attr in enumerate(self.attrs):
+ try:
+ self.old[i] = getattr(self.obj, attr)
+ except AttributeError:
+ pass
+ for attr, new in zip(self.attrs, self.new):
+ setattr(self.obj, attr, new)
+
+ def __exit__(self, *args):
+ for i, attr in enumerate(self.attrs):
+ try:
+ old = self.old[i]
+ except KeyError:
+ delattr(self.obj, attr)
+ else:
+ setattr(self.obj, attr, old)
+
+
+replace_attr = AttrReplace
+
+
+def replace_module_module(module, name, **kwargs):
+ return replace_attr(module, name, new_module(name, **kwargs))
+
+
+class ItemReplace(object):
+ def __init__(self, d, key, new, r=None):
+ self.key = key
+ self.new = new
+ self.d = d
+ self.r = r
+
+ def __enter__(self):
+ self.old = self.d.get(self.key)
+ self.d[self.key] = self.new
+ return self.r
+
+ def __exit__(self, *args):
+ if self.old is None:
+ try:
+ self.d.pop(self.key)
+ except KeyError:
+ pass
+ else:
+ self.d[self.key] = self.old
+
+
+def replace_item(d, key, new):
+ return ItemReplace(d, key, new, d)
+
+
+def replace_env(key, new, environ=None, **kwargs):
+ r = kwargs.copy()
+ r['environ'] = environ or {}
+ return ItemReplace(r['environ'], key, new, r)
+
+
+class PowerlineSingleTest(object):
+ def __init__(self, suite, name):
+ self.suite = suite
+ self.name = name
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ if exc_type is not None:
+ self.exception('Exception while running test: {0!r}'.format(
+ exc_value))
+
+ def fail(self, message, allow_failure=False):
+ return self.suite.fail(self.name, message, allow_failure)
+
+ def exception(self, message, allow_failure=False):
+ return self.suite.exception(self.name, message, allow_failure)
diff --git a/tests/modules/lib/config_mock.py b/tests/modules/lib/config_mock.py
new file mode 100644
index 0000000..900b60f
--- /dev/null
+++ b/tests/modules/lib/config_mock.py
@@ -0,0 +1,230 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import os
+
+from threading import Lock
+from copy import deepcopy
+from time import sleep
+from functools import wraps
+
+from powerline.renderer import Renderer
+from powerline.lib.config import ConfigLoader
+from powerline import Powerline, get_default_theme
+
+from tests.modules.lib import Args, replace_attr
+
+
+UT = get_default_theme(is_unicode=True)
+AT = get_default_theme(is_unicode=False)
+
+
+class TestHelpers(object):
+ def __init__(self, config):
+ self.config = config
+ self.access_log = []
+ self.access_lock = Lock()
+
+ def loader_condition(self, path):
+ return (path in self.config) and path
+
+ def find_config_files(self, cfg_path, config_loader, loader_callback):
+ if cfg_path.endswith('.json'):
+ cfg_path = cfg_path[:-5]
+ if cfg_path.startswith('/'):
+ cfg_path = cfg_path.lstrip('/')
+ with self.access_lock:
+ self.access_log.append('check:' + cfg_path)
+ if cfg_path in self.config:
+ yield cfg_path
+ else:
+ if config_loader:
+ config_loader.register_missing(self.loader_condition, loader_callback, cfg_path)
+ raise IOError(('fcf:' if cfg_path.endswith('raise') else '') + cfg_path)
+
+ def load_json_config(self, config_file_path, *args, **kwargs):
+ if config_file_path.endswith('.json'):
+ config_file_path = config_file_path[:-5]
+ if config_file_path.startswith('/'):
+ config_file_path = config_file_path.lstrip('/')
+ with self.access_lock:
+ self.access_log.append('load:' + config_file_path)
+ try:
+ return deepcopy(self.config[config_file_path])
+ except KeyError:
+ raise IOError(config_file_path)
+
+ def pop_events(self):
+ with self.access_lock:
+ r = self.access_log[:]
+ self.access_log = []
+ return r
+
+
+def log_call(func):
+ @wraps(func)
+ def ret(self, *args, **kwargs):
+ self._calls.append((func.__name__, args, kwargs))
+ return func(self, *args, **kwargs)
+ return ret
+
+
+class TestWatcher(object):
+ events = set()
+ lock = Lock()
+
+ def __init__(self):
+ self._calls = []
+
+ @log_call
+ def watch(self, file):
+ pass
+
+ @log_call
+ def __call__(self, file):
+ with self.lock:
+ if file in self.events:
+ self.events.remove(file)
+ return True
+ return False
+
+ def _reset(self, files):
+ with self.lock:
+ self.events.clear()
+ self.events.update(files)
+
+ @log_call
+ def unsubscribe(self):
+ pass
+
+
+class Logger(object):
+ def __init__(self):
+ self.messages = []
+ self.lock = Lock()
+
+ def _add_msg(self, attr, msg):
+ with self.lock:
+ self.messages.append(attr + ':' + msg)
+
+ def _pop_msgs(self):
+ with self.lock:
+ r = self.messages
+ self.messages = []
+ return r
+
+ def __getattr__(self, attr):
+ return lambda *args, **kwargs: self._add_msg(attr, *args, **kwargs)
+
+
+class SimpleRenderer(Renderer):
+ def hlstyle(self, fg=None, bg=None, attrs=None):
+ return '<{fg} {bg} {attrs}>'.format(fg=fg and fg[0], bg=bg and bg[0], attrs=attrs)
+
+
+class EvenSimplerRenderer(Renderer):
+ def hlstyle(self, fg=None, bg=None, attrs=None):
+ return '{{{fg}{bg}{attrs}}}'.format(
+ fg=fg and fg[0] or '-',
+ bg=bg and bg[0] or '-',
+ attrs=attrs if attrs else '',
+ )
+
+
+class TestPowerline(Powerline):
+ _created = False
+
+ def __init__(self, _helpers, **kwargs):
+ super(TestPowerline, self).__init__(**kwargs)
+ self._helpers = _helpers
+ self.find_config_files = _helpers.find_config_files
+
+ @staticmethod
+ def get_local_themes(local_themes):
+ return local_themes
+
+ @staticmethod
+ def get_config_paths():
+ return ['']
+
+ def _will_create_renderer(self):
+ return self.cr_kwargs
+
+ def _pop_events(self):
+ return self._helpers.pop_events()
+
+
+renderer = EvenSimplerRenderer
+
+
+class TestConfigLoader(ConfigLoader):
+ def __init__(self, _helpers, **kwargs):
+ watcher = TestWatcher()
+ super(TestConfigLoader, self).__init__(
+ load=_helpers.load_json_config,
+ watcher=watcher,
+ watcher_type='test',
+ **kwargs
+ )
+
+
+def get_powerline(config, **kwargs):
+ helpers = TestHelpers(config)
+ return get_powerline_raw(
+ helpers,
+ TestPowerline,
+ _helpers=helpers,
+ ext='test',
+ renderer_module='tests.modules.lib.config_mock',
+ logger=Logger(),
+ **kwargs
+ )
+
+
+def select_renderer(simpler_renderer=False):
+ global renderer
+ renderer = EvenSimplerRenderer if simpler_renderer else SimpleRenderer
+
+
+def get_powerline_raw(helpers, PowerlineClass, replace_gcp=False, **kwargs):
+ if not isinstance(helpers, TestHelpers):
+ helpers = TestHelpers(helpers)
+ select_renderer(kwargs.pop('simpler_renderer', False))
+
+ if replace_gcp:
+ class PowerlineClass(PowerlineClass):
+ @staticmethod
+ def get_config_paths():
+ return ['/']
+
+ pl = PowerlineClass(
+ config_loader=TestConfigLoader(
+ _helpers=helpers,
+ run_once=kwargs.get('run_once')
+ ),
+ **kwargs
+ )
+ pl._watcher = pl.config_loader.watcher
+ return pl
+
+
+def swap_attributes(config, powerline_module):
+ return replace_attr(powerline_module, 'os', Args(
+ path=Args(
+ isfile=lambda path: path.lstrip('/').replace('.json', '') in config,
+ join=os.path.join,
+ expanduser=lambda path: path,
+ realpath=lambda path: path,
+ dirname=os.path.dirname,
+ ),
+ environ={},
+ ))
+
+
+def add_watcher_events(p, *args, **kwargs):
+ if isinstance(p._watcher, TestWatcher):
+ p._watcher._reset(args)
+ while not p._will_create_renderer():
+ sleep(kwargs.get('interval', 0.1))
+ if not kwargs.get('wait', True):
+ return
diff --git a/tests/modules/lib/fsconfig.py b/tests/modules/lib/fsconfig.py
new file mode 100644
index 0000000..757e874
--- /dev/null
+++ b/tests/modules/lib/fsconfig.py
@@ -0,0 +1,83 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import os
+import json
+
+from subprocess import check_call
+from shutil import rmtree
+from itertools import chain
+
+from powerline import Powerline
+
+
+CONFIG_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'config')
+
+
+class TestPowerline(Powerline):
+ def __init__(self, _paths, *args, **kwargs):
+ super(TestPowerline, self).__init__(*args, **kwargs)
+ self._paths = _paths
+
+ def get_config_paths(self):
+ return self._paths
+
+
+def mkdir_recursive(directory):
+ if os.path.isdir(directory):
+ return
+ mkdir_recursive(os.path.dirname(directory))
+ os.mkdir(directory)
+
+
+class FSTree(object):
+ __slots__ = ('tree', 'p', 'p_kwargs', 'create_p', 'get_config_paths', 'root')
+
+ def __init__(
+ self,
+ tree,
+ p_kwargs={'run_once': True},
+ root=CONFIG_DIR,
+ get_config_paths=lambda p: (p,),
+ create_p=False
+ ):
+ self.tree = tree
+ self.root = root
+ self.get_config_paths = get_config_paths
+ self.create_p = create_p
+ self.p = None
+ self.p_kwargs = p_kwargs
+
+ def __enter__(self, *args):
+ os.mkdir(self.root)
+ for k, v in self.tree.items():
+ fname = os.path.join(self.root, k) + '.json'
+ mkdir_recursive(os.path.dirname(fname))
+ with open(fname, 'w') as F:
+ json.dump(v, F)
+ if self.create_p:
+ self.p = TestPowerline(
+ _paths=self.get_config_paths(self.root),
+ ext='test',
+ renderer_module='tests.modules.lib.config_mock',
+ **self.p_kwargs
+ )
+ if os.environ.get('POWERLINE_RUN_LINT_DURING_TESTS'):
+ try:
+ check_call(chain(['scripts/powerline-lint'], *[
+ ('-p', d) for d in (
+ self.p.get_config_paths() if self.p
+ else self.get_config_paths(self.root)
+ )
+ ]))
+ except:
+ self.__exit__()
+ raise
+ return self.p and self.p.__enter__(*args)
+
+ def __exit__(self, *args):
+ try:
+ rmtree(self.root)
+ finally:
+ if self.p:
+ self.p.__exit__(*args)
diff --git a/tests/modules/lib/terminal.py b/tests/modules/lib/terminal.py
new file mode 100644
index 0000000..540135d
--- /dev/null
+++ b/tests/modules/lib/terminal.py
@@ -0,0 +1,307 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import threading
+import os
+
+from time import sleep
+from itertools import groupby
+from signal import SIGKILL
+from difflib import ndiff
+
+import pexpect
+
+from powerline.lib.unicode import u
+
+from tests.modules.lib.vterm import VTerm, Dimensions
+
+
+class MutableDimensions(object):
+ def __init__(self, rows, cols):
+ super(MutableDimensions, self).__init__()
+ self._list = [rows, cols]
+
+ def __getitem__(self, idx):
+ return self._list[idx]
+
+ def __setitem__(self, idx, val):
+ self._list[idx] = val
+
+ def __iter__(self):
+ return iter(self._list)
+
+ def __len__(self):
+ return 2
+
+ def __nonzero__(self):
+ return True
+
+ __bool__ = __nonzero__
+
+ rows = property(
+ fget = lambda self: self._list[0],
+ fset = lambda self, val: self._list.__setitem__(0, val),
+ )
+ cols = property(
+ fget = lambda self: self._list[1],
+ fset = lambda self, val: self._list.__setitem__(1, val),
+ )
+
+
+class ExpectProcess(threading.Thread):
+ def __init__(self, lib, dim, cmd, args, cwd=None, env=None):
+ super(ExpectProcess, self).__init__()
+ self.vterm = VTerm(lib, dim)
+ self.lock = threading.Lock()
+ self.dim = Dimensions(*dim)
+ self.cmd = cmd
+ self.args = args
+ self.cwd = cwd
+ self.env = env
+ self.buffer = []
+ self.child_lock = threading.Lock()
+ self.shutdown_event = threading.Event()
+ self.started_event = threading.Event()
+
+ def run(self):
+ with self.child_lock:
+ child = pexpect.spawn(self.cmd, self.args, cwd=self.cwd,
+ env=self.env)
+ sleep(0.5)
+ child.setwinsize(self.dim.rows, self.dim.cols)
+ sleep(0.5)
+ self.child = child
+ self.started_event.set()
+ status = None
+ while status is None and not self.shutdown_event.is_set():
+ try:
+ with self.child_lock:
+ s = child.read_nonblocking(size=1024, timeout=0)
+ status = child.status
+ except pexpect.TIMEOUT:
+ pass
+ except pexpect.EOF:
+ break
+ else:
+ with self.lock:
+ self.vterm.push(s)
+ self.buffer.append(s)
+
+ if status is None:
+ child.kill(SIGKILL)
+
+ def kill(self):
+ self.shutdown_event.set()
+
+ def resize(self, dim):
+ with self.child_lock:
+ self.dim = Dimensions(*dim)
+ self.child.setwinsize(self.dim.rows, self.dim.cols)
+ self.vterm.resize(self.dim)
+
+ def __getitem__(self, position):
+ with self.lock:
+ return self.vterm.vtscreen[position]
+
+ def read(self):
+ with self.lock:
+ ret = b''.join(self.buffer)
+ del self.buffer[:]
+ return ret
+
+ def send(self, data):
+ with self.child_lock:
+ self.child.send(data)
+
+ def get_highlighted_text(self, text, attrs, default_props=(),
+ use_escapes=False):
+ ret = []
+ new_attrs = attrs.copy()
+ for cell_properties, segment_text in text:
+ if use_escapes:
+ escapes = ('\033[38;2;{0};{1};{2};48;2;{3};{4};{5}'.format(
+ *(cell_properties[0] + cell_properties[1]))) + (
+ ';1' if cell_properties[2] else ''
+ ) + (
+ ';3' if cell_properties[3] else ''
+ ) + (
+ ';4' if cell_properties[4] else ''
+ ) + 'm'
+ ret.append(escapes + segment_text + '\033[0m')
+ else:
+ segment_text = segment_text.translate({'{': '{{', '}': '}}'})
+ if cell_properties not in new_attrs:
+ new_attrs[cell_properties] = len(new_attrs) + 1
+ props_name = new_attrs[cell_properties]
+ if props_name in default_props:
+ ret.append(segment_text)
+ else:
+ ret.append('{' + str(props_name) + ':' + segment_text + '}')
+ return ''.join(ret), new_attrs
+
+ def get_row(self, row, attrs, default_props=(), use_escapes=False):
+ with self.lock:
+ return self.get_highlighted_text((
+ (key, ''.join((cell.text for cell in subline)))
+ for key, subline in groupby((
+ self.vterm.vtscreen[row, col]
+ for col in range(self.dim.cols)
+ ), lambda cell: cell.cell_properties_key)
+ ), attrs, default_props, use_escapes)
+
+ def get_screen(self, attrs, default_props=(), use_escapes=False):
+ lines = []
+ for row in range(self.dim.rows):
+ line, attrs = self.get_row(row, attrs, default_props, use_escapes)
+ lines.append(line)
+ return '\n'.join(lines), attrs
+
+
+def test_expected_result(p, test, last_attempt, last_attempt_cb, attempts):
+ debugging_tests = not not os.environ.get('_POWERLINE_DEBUGGING_TESTS')
+ expected_text, attrs = test['expected_result']
+ result = None
+ while attempts:
+ if 'row' in test:
+ row = test['row']
+ else:
+ row = p.dim.rows - 1
+ while row >= 0 and not p[row, 0].text:
+ row -= 1
+ if row < 0:
+ row = 0
+ actual_text, all_attrs = p.get_row(row, attrs)
+ if actual_text == expected_text:
+ return True
+ attempts -= 1
+ print('Actual result does not match expected for row {0}. Attempts left: {1}.'.format(
+ row, attempts))
+ sleep(2)
+ print('Result (row {0}):'.format(row))
+ print(actual_text)
+ print('Expected:')
+ print(expected_text)
+ print('Attributes:')
+ for v, k in sorted(
+ ((v, k) for k, v in all_attrs.items()),
+ key=(lambda t: '%02u'.format(t[0]) if isinstance(t[0], int) else t[0]),
+ ):
+ print('{k!r}: {v!r},'.format(v=v, k=k))
+ print('Screen:')
+ screen, screen_attrs = p.get_screen(attrs, use_escapes=debugging_tests)
+ print(screen)
+ print(screen_attrs)
+ print('_' * 80)
+ print('Diff:')
+ print('=' * 80)
+ print(''.join((
+ u(line) for line in ndiff([actual_text + '\n'], [expected_text + '\n']))
+ ))
+ if last_attempt and last_attempt_cb:
+ last_attempt_cb()
+ return False
+
+
+ENV_BASE = {
+ # Reasoning:
+ # 1. vt* TERMs (used to be vt100 here) make tmux-1.9 use different and
+ # identical colors for inactive windows. This is not like tmux-1.6:
+ # foreground color is different from separator color and equal to (0,
+ # 102, 153) for some reason (separator has correct color). tmux-1.8 is
+ # fine, so are older versions (though tmux-1.6 and tmux-1.7 do not have
+ # highlighting for previously active window) and my system tmux-1.9a.
+ # 2. screen, xterm and some other non-256color terminals both have the same
+ # issue and make libvterm emit complains like `Unhandled CSI SGR 3231`.
+ # 3. screen-256color, xterm-256color and other -256color terminals make
+ # libvterm emit complains about unhandled escapes to stderr.
+ # 4. `st-256color` does not have any of the above problems, but it may be
+ # not present on the target system because it is installed with
+ # x11-terms/st and not with sys-libs/ncurses.
+ #
+ # For the given reasons decision was made: to fix tmux-1.9 tests and not
+ # make libvterm emit any data to stderr st-256color $TERM should be used, up
+ # until libvterm has its own terminfo database entry (if it ever will). To
+ # make sure that relevant terminfo entry is present on the target system it
+ # should be distributed with powerline test package. To make distribution
+ # not require modifying anything outside of powerline test directory
+ # TERMINFO variable is set.
+ #
+ # This fix propagates to non-tmux vterm tests just in case.
+ 'TERM': 'st-256color',
+ # Also $TERMINFO definition in get_env
+
+ 'POWERLINE_CONFIG_PATHS': os.path.abspath('powerline/config_files'),
+ 'POWERLINE_COMMAND': 'powerline-render',
+ 'LD_LIBRARY_PATH': os.environ.get('LD_LIBRARY_PATH', ''),
+ 'PYTHONPATH': os.environ.get('PYTHONPATH', ''),
+}
+
+
+def get_env(vterm_path, test_dir, *args, **kwargs):
+ env = ENV_BASE.copy()
+ env.update({
+ 'TERMINFO': os.path.join(test_dir, 'terminfo'),
+ 'PATH': vterm_path,
+ 'SHELL': os.path.join(vterm_path, 'bash'),
+ })
+ env.update(*args, **kwargs)
+ return env
+
+
+def do_terminal_tests(tests, cmd, dim, args, env, suite, cwd=None, fin_cb=None,
+ last_attempt_cb=None, attempts=None):
+ debugging_tests = not not os.environ.get('_POWERLINE_DEBUGGING_TESTS')
+ default_attempts = 2 if debugging_tests else 3
+ if attempts is None:
+ attempts = default_attempts
+ lib = os.environ.get('POWERLINE_LIBVTERM')
+ if not lib:
+ if os.path.exists('tests/bot-ci/deps/libvterm/libvterm.so'):
+ lib = 'tests/bot-ci/deps/libvterm/libvterm.so'
+ else:
+ lib = 'libvterm.so'
+
+ while attempts:
+ try:
+ p = ExpectProcess(
+ lib=lib,
+ dim=dim,
+ cmd=cmd,
+ args=args,
+ cwd=cwd,
+ env=env,
+ )
+ p.start()
+ p.started_event.wait()
+
+ ret = True
+
+ for i, test in enumerate(tests):
+ with suite.test(test.get('name', 'test_{0}'.format(i)),
+ attempts - 1) as ptest:
+ try:
+ test_prep = test['prep_cb']
+ except KeyError:
+ pass
+ else:
+ test_prep(p)
+ test_result = test_expected_result(
+ p, test, attempts == 0, last_attempt_cb,
+ test.get('attempts', default_attempts)
+ )
+ if not test_result:
+ ptest.fail('Result does not match expected')
+ ret = ret and test_result
+
+ if ret:
+ return ret
+ finally:
+ if fin_cb:
+ fin_cb(p=p, cmd=cmd, env=env)
+ p.kill()
+ p.join(10)
+ assert(not p.isAlive())
+
+ attempts -= 1
+
+ return False
diff --git a/tests/modules/lib/vterm.py b/tests/modules/lib/vterm.py
new file mode 100644
index 0000000..1984e1b
--- /dev/null
+++ b/tests/modules/lib/vterm.py
@@ -0,0 +1,193 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import ctypes
+
+from collections import namedtuple
+
+from powerline.lib.unicode import unicode, unichr, tointiter
+
+
+Dimensions = namedtuple('Dimensions', ('rows', 'cols'))
+
+
+class CTypesFunction(object):
+ def __init__(self, library, name, rettype, args):
+ self.name = name
+ self.prototype = ctypes.CFUNCTYPE(rettype, *[
+ arg[1] for arg in args
+ ])
+ self.args = args
+ self.func = self.prototype((name, library), tuple((
+ (1, arg[0]) for arg in args
+ )))
+
+ def __call__(self, *args, **kwargs):
+ return self.func(*args, **kwargs)
+
+ def __repr__(self):
+ return '{cls}(<library>, {name!r}, {rettype!r}, {args!r})'.format(
+ cls=self.__class__.__name__,
+ **self.__dict__
+ )
+
+
+class CTypesLibraryFuncsCollection(object):
+ def __init__(self, lib, **kwargs):
+ self.lib = lib
+ library_loader = ctypes.LibraryLoader(ctypes.CDLL)
+ library = library_loader.LoadLibrary(lib)
+ self.library = library
+ for name, args in kwargs.items():
+ self.__dict__[name] = CTypesFunction(library, name, *args)
+
+
+class VTermPos_s(ctypes.Structure):
+ _fields_ = (
+ ('row', ctypes.c_int),
+ ('col', ctypes.c_int),
+ )
+
+
+class VTermColor_s(ctypes.Structure):
+ _fields_ = (
+ ('red', ctypes.c_uint8),
+ ('green', ctypes.c_uint8),
+ ('blue', ctypes.c_uint8),
+ )
+
+
+class VTermScreenCellAttrs_s(ctypes.Structure):
+ _fields_ = (
+ ('bold', ctypes.c_uint, 1),
+ ('underline', ctypes.c_uint, 2),
+ ('italic', ctypes.c_uint, 1),
+ ('blink', ctypes.c_uint, 1),
+ ('reverse', ctypes.c_uint, 1),
+ ('strike', ctypes.c_uint, 1),
+ ('font', ctypes.c_uint, 4),
+ ('dwl', ctypes.c_uint, 1),
+ ('dhl', ctypes.c_uint, 2),
+ )
+
+
+VTERM_MAX_CHARS_PER_CELL = 6
+
+
+class VTermScreenCell_s(ctypes.Structure):
+ _fields_ = (
+ ('chars', ctypes.ARRAY(ctypes.c_uint32, VTERM_MAX_CHARS_PER_CELL)),
+ ('width', ctypes.c_char),
+ ('attrs', VTermScreenCellAttrs_s),
+ ('fg', VTermColor_s),
+ ('bg', VTermColor_s),
+ )
+
+
+VTerm_p = ctypes.c_void_p
+VTermScreen_p = ctypes.c_void_p
+
+
+def get_functions(lib):
+ return CTypesLibraryFuncsCollection(
+ lib,
+ vterm_new=(VTerm_p, (
+ ('rows', ctypes.c_int),
+ ('cols', ctypes.c_int)
+ )),
+ vterm_obtain_screen=(VTermScreen_p, (('vt', VTerm_p),)),
+ vterm_set_size=(None, (
+ ('vt', VTerm_p),
+ ('rows', ctypes.c_int),
+ ('cols', ctypes.c_int)
+ )),
+ vterm_screen_reset=(None, (
+ ('screen', VTermScreen_p),
+ ('hard', ctypes.c_int)
+ )),
+ vterm_input_write=(ctypes.c_size_t, (
+ ('vt', VTerm_p),
+ ('bytes', ctypes.POINTER(ctypes.c_char)),
+ ('size', ctypes.c_size_t),
+ )),
+ vterm_screen_get_cell=(ctypes.c_int, (
+ ('screen', VTermScreen_p),
+ ('pos', VTermPos_s),
+ ('cell', ctypes.POINTER(VTermScreenCell_s))
+ )),
+ vterm_free=(None, (('vt', VTerm_p),)),
+ vterm_set_utf8=(None, (('vt', VTerm_p), ('is_utf8', ctypes.c_int))),
+ )
+
+
+class VTermColor(object):
+ __slots__ = ('red', 'green', 'blue')
+
+ def __init__(self, color):
+ self.red = color.red
+ self.green = color.green
+ self.blue = color.blue
+
+ @property
+ def color_key(self):
+ return (self.red, self.green, self.blue)
+
+
+class VTermScreenCell(object):
+ def __init__(self, vtsc):
+ for field in VTermScreenCellAttrs_s._fields_:
+ field_name = field[0]
+ setattr(self, field_name, getattr(vtsc.attrs, field_name))
+ self.text = ''.join((
+ unichr(vtsc.chars[i]) for i in range(VTERM_MAX_CHARS_PER_CELL)
+ )).rstrip('\x00')
+ self.width = next(tointiter(vtsc.width))
+ self.fg = VTermColor(vtsc.fg)
+ self.bg = VTermColor(vtsc.bg)
+ self.cell_properties_key = (
+ self.fg.color_key,
+ self.bg.color_key,
+ self.bold,
+ self.underline,
+ self.italic,
+ )
+
+
+class VTermScreen(object):
+ def __init__(self, functions, screen):
+ self.functions = functions
+ self.screen = screen
+
+ def __getitem__(self, position):
+ pos = VTermPos_s(*position)
+ cell = VTermScreenCell_s()
+ ret = self.functions.vterm_screen_get_cell(self.screen, pos, cell)
+ if ret != 1:
+ raise ValueError('vterm_screen_get_cell returned {0}'.format(ret))
+ return VTermScreenCell(cell)
+
+ def reset(self, hard):
+ self.functions.vterm_screen_reset(self.screen, int(bool(hard)))
+
+
+class VTerm(object):
+ def __init__(self, lib, dim):
+ self.functions = get_functions(lib)
+ self.vt = self.functions.vterm_new(dim.rows, dim.cols)
+ self.functions.vterm_set_utf8(self.vt, 1)
+ self.vtscreen = VTermScreen(self.functions, self.functions.vterm_obtain_screen(self.vt))
+ self.vtscreen.reset(True)
+
+ def push(self, data):
+ if isinstance(data, unicode):
+ data = data.encode('utf-8')
+ return self.functions.vterm_input_write(self.vt, data, len(data))
+
+ def resize(self, dim):
+ self.functions.vterm_set_size(self.vt, dim.rows, dim.cols)
+
+ def __del__(self):
+ try:
+ self.functions.vterm_free(self.vt)
+ except AttributeError:
+ pass
diff --git a/tests/modules/matchers.py b/tests/modules/matchers.py
new file mode 100644
index 0000000..e905de3
--- /dev/null
+++ b/tests/modules/matchers.py
@@ -0,0 +1,6 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+
+def always_true(matcher_info):
+ return True
diff --git a/tests/modules/vim.py b/tests/modules/vim.py
new file mode 100644
index 0000000..3f6882c
--- /dev/null
+++ b/tests/modules/vim.py
@@ -0,0 +1,927 @@
+# vim:fileencoding=utf-8:noet
+_log = []
+vars = {}
+vvars = {'version': 703}
+_tabpage = 0
+_mode = 'n'
+_buf_purge_events = set()
+options = {
+ 'paste': 0,
+ 'ambiwidth': 'single',
+ 'columns': 80,
+ 'encoding': 'utf-8',
+}
+_last_bufnr = 0
+_highlights = {}
+from collections import defaultdict as _defaultdict
+_environ = _defaultdict(lambda: '')
+del _defaultdict
+
+
+_thread_id = None
+
+
+def _set_thread_id():
+ global _thread_id
+ from threading import current_thread
+ _thread_id = current_thread().ident
+
+
+# Assuming import is done from the main thread
+_set_thread_id()
+
+
+def _print_log():
+ for item in _log:
+ print (item)
+ _log[:] = ()
+
+
+def _vim(func):
+ from functools import wraps
+ from threading import current_thread
+
+ @wraps(func)
+ def f(*args, **kwargs):
+ global _thread_id
+ if _thread_id != current_thread().ident:
+ raise RuntimeError('Accessing vim from separate threads is not allowed')
+ _log.append((func.__name__, args))
+ return func(*args, **kwargs)
+
+ return f
+
+
+def _unicode(func):
+ from functools import wraps
+ import sys
+
+ if sys.version_info < (3,):
+ return func
+
+ @wraps(func)
+ def f(*args, **kwargs):
+ from powerline.lib.unicode import u
+ ret = func(*args, **kwargs)
+ if isinstance(ret, bytes):
+ ret = u(ret)
+ return ret
+
+ return f
+
+
+class _Buffers(object):
+ @_vim
+ def __init__(self):
+ self.d = {}
+
+ @_vim
+ def __len__(self):
+ return len(self.d)
+
+ @_vim
+ def __getitem__(self, item):
+ return self.d[item]
+
+ @_vim
+ def __setitem__(self, item, value):
+ self.d[item] = value
+
+ @_vim
+ def __iter__(self):
+ return iter(self.d.values())
+
+ @_vim
+ def __contains__(self, item):
+ return item in self.d
+
+ @_vim
+ def _keys(self):
+ return self.d.keys()
+
+ @_vim
+ def _pop(self, *args, **kwargs):
+ return self.d.pop(*args, **kwargs)
+
+
+buffers = _Buffers()
+
+
+class _ObjList(object):
+ @_vim
+ def __init__(self, objtype):
+ self.l = []
+ self.objtype = objtype
+
+ @_vim
+ def __getitem__(self, item):
+ return self.l[item - int(item > 0)]
+
+ @_vim
+ def __len__(self):
+ return len(self.l)
+
+ @_vim
+ def __iter__(self):
+ return iter(self.l)
+
+ @_vim
+ def _pop(self, idx):
+ obj = self.l.pop(idx - 1)
+ for moved_obj in self.l[idx - 1:]:
+ moved_obj.number -= 1
+ return obj
+
+ @_vim
+ def _append(self, *args, **kwargs):
+ return self.l.append(*args, **kwargs)
+
+ @_vim
+ def _new(self, *args, **kwargs):
+ number = len(self) + 1
+ new_obj = self.objtype(number, *args, **kwargs)
+ self._append(new_obj)
+ return new_obj
+
+
+def _construct_result(r):
+ import sys
+ if sys.version_info < (3,):
+ return r
+ else:
+ if isinstance(r, str):
+ return r.encode('utf-8')
+ elif isinstance(r, list):
+ return [_construct_result(i) for i in r]
+ elif isinstance(r, dict):
+ return dict((
+ (_construct_result(k), _construct_result(v))
+ for k, v in r.items()
+ ))
+ return r
+
+
+def _str_func(func):
+ from functools import wraps
+
+ @wraps(func)
+ def f(*args, **kwargs):
+ return _construct_result(func(*args, **kwargs))
+ return f
+
+
+def _log_print():
+ import sys
+ for entry in _log:
+ sys.stdout.write(repr(entry) + '\n')
+
+
+_current_group = None
+_on_wipeout = []
+
+
+@_vim
+def command(cmd):
+ global _current_group
+ cmd = cmd.lstrip()
+ if cmd.startswith('let g:'):
+ import re
+ varname, value = re.compile(r'^let g:(\w+)\s*=\s*(.*)').match(cmd).groups()
+ vars[varname] = value
+ elif cmd.startswith('hi '):
+ sp = cmd.split()
+ _highlights[sp[1]] = sp[2:]
+ elif cmd.startswith('augroup'):
+ augroup = cmd.partition(' ')[2]
+ if augroup.upper() == 'END':
+ _current_group = None
+ else:
+ _current_group = augroup
+ elif cmd.startswith('autocmd'):
+ rest = cmd.partition(' ')[2]
+ auevent, rest = rest.partition(' ')[::2]
+ pattern, aucmd = rest.partition(' ')[::2]
+ if auevent != 'BufWipeout' or pattern != '*':
+ raise NotImplementedError
+ import sys
+ if sys.version_info < (3,):
+ if not aucmd.startswith(':python '):
+ raise NotImplementedError
+ else:
+ if not aucmd.startswith(':python3 '):
+ raise NotImplementedError
+ _on_wipeout.append(aucmd.partition(' ')[2])
+ elif cmd.startswith('set '):
+ if cmd.startswith('set statusline='):
+ options['statusline'] = cmd[len('set statusline='):]
+ elif cmd.startswith('set tabline='):
+ options['tabline'] = cmd[len('set tabline='):]
+ else:
+ raise NotImplementedError(cmd)
+ else:
+ raise NotImplementedError(cmd)
+
+
+@_vim
+@_unicode
+def eval(expr):
+ if expr.startswith('g:'):
+ return vars[expr[2:]]
+ elif expr.startswith('v:'):
+ return vvars[expr[2:]]
+ elif expr.startswith('&'):
+ return options[expr[1:]]
+ elif expr.startswith('$'):
+ return _environ[expr[1:]]
+ elif expr.startswith('PowerlineRegisterCachePurgerEvent'):
+ _buf_purge_events.add(expr[expr.find('"') + 1:expr.rfind('"') - 1])
+ return '0'
+ elif expr.startswith('exists('):
+ return '0'
+ elif expr.startswith('getwinvar('):
+ import re
+ match = re.match(r'^getwinvar\((\d+), "(\w+)"\)$', expr)
+ if not match:
+ raise NotImplementedError(expr)
+ winnr = int(match.group(1))
+ varname = match.group(2)
+ return _emul_getwinvar(winnr, varname)
+ elif expr.startswith('has_key('):
+ import re
+ match = re.match(r'^has_key\(getwinvar\((\d+), ""\), "(\w+)"\)$', expr)
+ if match:
+ winnr = int(match.group(1))
+ varname = match.group(2)
+ return 0 + (varname in current.tabpage.windows[winnr].vars)
+ else:
+ match = re.match(r'^has_key\(gettabwinvar\((\d+), (\d+), ""\), "(\w+)"\)$', expr)
+ if not match:
+ raise NotImplementedError(expr)
+ tabnr = int(match.group(1))
+ winnr = int(match.group(2))
+ varname = match.group(3)
+ return 0 + (varname in tabpages[tabnr].windows[winnr].vars)
+ elif expr == 'getbufvar("%", "NERDTreeRoot").path.str()':
+ import os
+ assert os.path.basename(current.buffer.name).startswith('NERD_tree_')
+ return '/usr/include'
+ elif expr.startswith('getbufvar('):
+ import re
+ match = re.match(r'^getbufvar\((\d+), ["\'](.+)["\']\)$', expr)
+ if not match:
+ raise NotImplementedError(expr)
+ bufnr = int(match.group(1))
+ varname = match.group(2)
+ return _emul_getbufvar(bufnr, varname)
+ elif expr == 'tabpagenr()':
+ return current.tabpage.number
+ elif expr == 'tabpagenr("$")':
+ return len(tabpages)
+ elif expr.startswith('tabpagewinnr('):
+ tabnr = int(expr[len('tabpagewinnr('):-1])
+ return tabpages[tabnr].window.number
+ elif expr.startswith('tabpagebuflist('):
+ import re
+ match = re.match(r'tabpagebuflist\((\d+)\)\[(\d+)\]', expr)
+ tabnr = int(match.group(1))
+ winnr = int(match.group(2)) + 1
+ return tabpages[tabnr].windows[winnr].buffer.number
+ elif expr.startswith('gettabwinvar('):
+ import re
+ match = re.match(r'gettabwinvar\((\d+), (\d+), "(\w+)"\)', expr)
+ tabnr = int(match.group(1))
+ winnr = int(match.group(2))
+ varname = match.group(3)
+ return tabpages[tabnr].windows[winnr].vars[varname]
+ elif expr.startswith('type(function('):
+ import re
+ match = re.match(r'^type\(function\("([^"]+)"\)\) == 2$', expr)
+ if not match:
+ raise NotImplementedError(expr)
+ return 0
+ raise NotImplementedError(expr)
+
+
+@_vim
+def bindeval(expr):
+ if expr == 'g:':
+ return vars
+ elif expr == '{}':
+ return {}
+ elif expr == '[]':
+ return []
+ import re
+ match = re.compile(r'^function\("([^"\\]+)"\)$').match(expr)
+ if match:
+ return globals()['_emul_' + match.group(1)]
+ else:
+ raise NotImplementedError
+
+
+@_vim
+@_str_func
+def _emul_mode(*args):
+ if args and args[0]:
+ return _mode
+ else:
+ return _mode[0]
+
+
+@_vim
+@_str_func
+def _emul_getbufvar(bufnr, varname):
+ import re
+ if varname[0] == '&':
+ if bufnr == '%':
+ bufnr = current.buffer.number
+ if bufnr not in buffers:
+ return ''
+ try:
+ return buffers[bufnr].options[varname[1:]]
+ except KeyError:
+ try:
+ return options[varname[1:]]
+ except KeyError:
+ return ''
+ elif re.match('^[a-zA-Z_]+$', varname):
+ if bufnr == '%':
+ bufnr = current.buffer.number
+ if bufnr not in buffers:
+ return ''
+ return buffers[bufnr].vars[varname]
+ raise NotImplementedError
+
+
+@_vim
+@_str_func
+def _emul_getwinvar(winnr, varname):
+ return current.tabpage.windows[winnr].vars.get(varname, '')
+
+
+@_vim
+def _emul_setwinvar(winnr, varname, value):
+ current.tabpage.windows[winnr].vars[varname] = value
+
+
+@_vim
+def _emul_virtcol(expr):
+ if expr == '.':
+ return current.window.cursor[1] + 1
+ if isinstance(expr, list) and len(expr) == 3:
+ return expr[-2] + expr[-1]
+ raise NotImplementedError
+
+
+_v_pos = None
+
+
+@_vim
+def _emul_getpos(expr):
+ if expr == '.':
+ return [0, current.window.cursor[0] + 1, current.window.cursor[1] + 1, 0]
+ if expr == 'v':
+ return _v_pos or [0, current.window.cursor[0] + 1, current.window.cursor[1] + 1, 0]
+ raise NotImplementedError
+
+
+@_vim
+@_str_func
+def _emul_fnamemodify(path, modstring):
+ import os
+ _modifiers = {
+ '~': lambda path: path.replace(os.environ['HOME'].encode('utf-8'), b'~') if path.startswith(os.environ['HOME'].encode('utf-8')) else path,
+ '.': lambda path: (lambda tpath: path if tpath[:3] == b'..' + os.sep.encode() else tpath)(os.path.relpath(path)),
+ 't': lambda path: os.path.basename(path),
+ 'h': lambda path: os.path.dirname(path),
+ }
+
+ for mods in modstring.split(':')[1:]:
+ path = _modifiers[mods](path)
+ return path
+
+
+@_vim
+@_str_func
+def _emul_expand(expr):
+ global _abuf
+ if expr == '<abuf>':
+ return _abuf or current.buffer.number
+ raise NotImplementedError
+
+
+@_vim
+def _emul_bufnr(expr):
+ if expr == '$':
+ return _last_bufnr
+ raise NotImplementedError
+
+
+@_vim
+def _emul_exists(ident):
+ if ident.startswith('g:'):
+ return ident[2:] in vars
+ elif ident.startswith(':'):
+ return 0
+ raise NotImplementedError
+
+
+@_vim
+def _emul_line2byte(line):
+ buflines = current.buffer._buf_lines
+ if line == len(buflines) + 1:
+ return sum((len(s) for s in buflines)) + 1
+ raise NotImplementedError
+
+
+@_vim
+def _emul_line(expr):
+ cursorline = current.window.cursor[0] + 1
+ numlines = len(current.buffer._buf_lines)
+ if expr == 'w0':
+ return max(cursorline - 5, 1)
+ if expr == 'w$':
+ return min(cursorline + 5, numlines)
+ raise NotImplementedError
+
+
+@_vim
+@_str_func
+def _emul_strtrans(s):
+ # FIXME Do more replaces
+ return s.replace(b'\xFF', b'<ff>')
+
+
+@_vim
+@_str_func
+def _emul_bufname(bufnr):
+ try:
+ return buffers[bufnr]._name or b''
+ except KeyError:
+ return b''
+
+
+_window_id = 0
+
+
+class _Window(object):
+ def __init__(self, number, buffer=None, cursor=(1, 0), width=80):
+ global _window_id
+ self.cursor = cursor
+ self.width = width
+ self.number = number
+ if buffer:
+ if type(buffer) is _Buffer:
+ self.buffer = buffer
+ else:
+ self.buffer = _Buffer(**buffer)
+ else:
+ self.buffer = _Buffer()
+ _window_id += 1
+ self._window_id = _window_id
+ self.options = {}
+ self.vars = {
+ 'powerline_window_id': self._window_id,
+ }
+
+ def __repr__(self):
+ return '<window ' + str(self.number - 1) + '>'
+
+
+class _Tabpage(object):
+ def __init__(self, number):
+ self.windows = _ObjList(_Window)
+ self.number = number
+
+ def _new_window(self, **kwargs):
+ self.window = self.windows._new(**kwargs)
+ return self.window
+
+ def _close_window(self, winnr, open_window=True):
+ curwinnr = self.window.number
+ win = self.windows._pop(winnr)
+ if self.windows and winnr == curwinnr:
+ self.window = self.windows[-1]
+ elif open_window:
+ current.tabpage._new_window()
+ return win
+
+ def _close(self):
+ global _tabpage
+ while self.windows:
+ self._close_window(1, False)
+ tabpages._pop(self.number)
+ _tabpage = len(tabpages)
+
+
+tabpages = _ObjList(_Tabpage)
+
+
+_abuf = None
+
+
+class _Buffer(object):
+ def __init__(self, name=None):
+ global _last_bufnr
+ _last_bufnr += 1
+ bufnr = _last_bufnr
+ self.number = bufnr
+ # FIXME Use unicode() for python-3
+ self.name = name
+ self.vars = {'changedtick': 1}
+ self.options = {
+ 'modified': 0,
+ 'readonly': 0,
+ 'fileformat': 'unix',
+ 'filetype': '',
+ 'buftype': '',
+ 'fileencoding': 'utf-8',
+ 'textwidth': 80,
+ }
+ self._buf_lines = ['']
+ self._undostate = [self._buf_lines[:]]
+ self._undo_written = len(self._undostate)
+ buffers[bufnr] = self
+
+ @property
+ def name(self):
+ import sys
+ if sys.version_info < (3,):
+ return self._name
+ else:
+ return str(self._name, 'utf-8') if self._name else None
+
+ @name.setter
+ def name(self, name):
+ if name is None:
+ self._name = None
+ else:
+ import os
+ if type(name) is not bytes:
+ name = name.encode('utf-8')
+ if b':/' in name:
+ self._name = name
+ else:
+ self._name = os.path.abspath(name)
+
+ def __getitem__(self, line):
+ return self._buf_lines[line]
+
+ def __setitem__(self, line, value):
+ self.options['modified'] = 1
+ self.vars['changedtick'] += 1
+ self._buf_lines[line] = value
+ from copy import copy
+ self._undostate.append(copy(self._buf_lines))
+
+ def __setslice__(self, *args):
+ self.options['modified'] = 1
+ self.vars['changedtick'] += 1
+ self._buf_lines.__setslice__(*args)
+ from copy import copy
+ self._undostate.append(copy(self._buf_lines))
+
+ def __getslice__(self, *args):
+ return self._buf_lines.__getslice__(*args)
+
+ def __len__(self):
+ return len(self._buf_lines)
+
+ def __repr__(self):
+ return '<buffer ' + str(self.name) + '>'
+
+ def __del__(self):
+ global _abuf
+ bufnr = self.number
+ try:
+ import __main__
+ except ImportError:
+ pass
+ except RuntimeError:
+ # Module may have already been garbage-collected
+ pass
+ else:
+ if _on_wipeout:
+ _abuf = bufnr
+ try:
+ for event in _on_wipeout:
+ exec(event, __main__.__dict__)
+ finally:
+ _abuf = None
+
+
+class _Current(object):
+ @property
+ def buffer(self):
+ return self.window.buffer
+
+ @property
+ def window(self):
+ return self.tabpage.window
+
+ @property
+ def tabpage(self):
+ return tabpages[_tabpage - 1]
+
+
+current = _Current()
+
+
+_dict = None
+
+
+@_vim
+def _init():
+ global _dict
+
+ if _dict:
+ return _dict
+
+ _dict = {}
+ for varname, value in globals().items():
+ if varname[0] != '_':
+ _dict[varname] = value
+ _tabnew()
+ return _dict
+
+
+@_vim
+def _get_segment_info():
+ mode_translations = {
+ chr(ord('V') - 0x40): '^V',
+ chr(ord('S') - 0x40): '^S',
+ }
+ mode = _mode
+ mode = mode_translations.get(mode, mode)
+ window = current.window
+ buffer = current.buffer
+ tabpage = current.tabpage
+ return {
+ 'window': window,
+ 'winnr': window.number,
+ 'buffer': buffer,
+ 'bufnr': buffer.number,
+ 'tabpage': tabpage,
+ 'tabnr': tabpage.number,
+ 'window_id': window._window_id,
+ 'mode': mode,
+ 'encoding': options['encoding'],
+ }
+
+
+@_vim
+def _launch_event(event):
+ pass
+
+
+@_vim
+def _start_mode(mode):
+ global _mode
+ if mode == 'i':
+ _launch_event('InsertEnter')
+ elif _mode == 'i':
+ _launch_event('InsertLeave')
+ _mode = mode
+
+
+@_vim
+def _undo():
+ if len(current.buffer._undostate) == 1:
+ return
+ buffer = current.buffer
+ buffer._undostate.pop(-1)
+ buffer._buf_lines = buffer._undostate[-1]
+ if buffer._undo_written == len(buffer._undostate):
+ buffer.options['modified'] = 0
+
+
+@_vim
+def _edit(name=None):
+ if current.buffer.name is None:
+ buffer = current.buffer
+ buffer.name = name
+ else:
+ buffer = _Buffer(name)
+ current.window.buffer = buffer
+
+
+@_vim
+def _tabnew(name=None):
+ global windows
+ global _tabpage
+ tabpage = tabpages._new()
+ windows = tabpage.windows
+ _tabpage = len(tabpages)
+ _new(name)
+ return tabpage
+
+
+@_vim
+def _new(name=None):
+ current.tabpage._new_window(buffer={'name': name})
+
+
+@_vim
+def _split():
+ current.tabpage._new_window(buffer=current.buffer)
+
+
+@_vim
+def _close(winnr, wipe=True):
+ win = current.tabpage._close_window(winnr)
+ if wipe:
+ for w in current.tabpage.windows:
+ if w.buffer.number == win.buffer.number:
+ break
+ else:
+ _bw(win.buffer.number)
+
+
+@_vim
+def _bw(bufnr=None):
+ bufnr = bufnr or current.buffer.number
+ winnr = 1
+ for win in current.tabpage.windows:
+ if win.buffer.number == bufnr:
+ _close(winnr, wipe=False)
+ winnr += 1
+ buffers._pop(bufnr)
+ if not buffers:
+ _Buffer()
+ _b(max(buffers._keys()))
+
+
+@_vim
+def _b(bufnr):
+ current.window.buffer = buffers[bufnr]
+
+
+@_vim
+def _set_cursor(line, col):
+ current.window.cursor = (line, col)
+ if _mode == 'n':
+ _launch_event('CursorMoved')
+ elif _mode == 'i':
+ _launch_event('CursorMovedI')
+
+
+@_vim
+def _get_buffer():
+ return current.buffer
+
+
+@_vim
+def _set_bufoption(option, value, bufnr=None):
+ buffers[bufnr or current.buffer.number].options[option] = value
+ if option == 'filetype':
+ _launch_event('FileType')
+
+
+class _WithNewBuffer(object):
+ def __init__(self, func, *args, **kwargs):
+ self.call = lambda: func(*args, **kwargs)
+
+ def __enter__(self):
+ self.call()
+ self.bufnr = current.buffer.number
+ return _get_segment_info()
+
+ def __exit__(self, *args):
+ _bw(self.bufnr)
+
+
+@_vim
+def _set_dict(d, new, setfunc=None):
+ if not setfunc:
+ def setfunc(k, v):
+ d[k] = v
+
+ old = {}
+ na = []
+ for k, v in new.items():
+ try:
+ old[k] = d[k]
+ except KeyError:
+ na.append(k)
+ setfunc(k, v)
+ return old, na
+
+
+class _WithBufOption(object):
+ def __init__(self, **new):
+ self.new = new
+
+ def __enter__(self):
+ self.buffer = current.buffer
+ self.old = _set_dict(self.buffer.options, self.new, _set_bufoption)[0]
+
+ def __exit__(self, *args):
+ self.buffer.options.update(self.old)
+
+
+class _WithMode(object):
+ def __init__(self, new):
+ self.new = new
+
+ def __enter__(self):
+ self.old = _mode
+ _start_mode(self.new)
+ return _get_segment_info()
+
+ def __exit__(self, *args):
+ _start_mode(self.old)
+
+
+class _WithDict(object):
+ def __init__(self, d, **new):
+ self.new = new
+ self.d = d
+
+ def __enter__(self):
+ self.old, self.na = _set_dict(self.d, self.new)
+
+ def __exit__(self, *args):
+ self.d.update(self.old)
+ for k in self.na:
+ self.d.pop(k)
+
+
+class _WithSplit(object):
+ def __enter__(self):
+ _split()
+
+ def __exit__(self, *args):
+ _close(2, wipe=False)
+
+
+class _WithBufName(object):
+ def __init__(self, new):
+ self.new = new
+
+ def __enter__(self):
+ import os
+ buffer = current.buffer
+ self.buffer = buffer
+ self.old = buffer.name
+ buffer.name = self.new
+
+ def __exit__(self, *args):
+ self.buffer.name = self.old
+
+
+class _WithNewTabPage(object):
+ def __init__(self, *args, **kwargs):
+ self.args = args
+ self.kwargs = kwargs
+
+ def __enter__(self):
+ self.tab = _tabnew(*self.args, **self.kwargs)
+
+ def __exit__(self, *args):
+ self.tab._close()
+
+
+class _WithGlobal(object):
+ def __init__(self, **kwargs):
+ self.kwargs = kwargs
+
+ def __enter__(self):
+ self.empty = object()
+ self.old = dict(((key, globals().get(key, self.empty)) for key in self.kwargs))
+ globals().update(self.kwargs)
+
+ def __exit__(self, *args):
+ for k, v in self.old.items():
+ if v is self.empty:
+ globals().pop(k, None)
+ else:
+ globals()[k] = v
+
+
+@_vim
+def _with(key, *args, **kwargs):
+ if key == 'buffer':
+ return _WithNewBuffer(_edit, *args, **kwargs)
+ elif key == 'bufname':
+ return _WithBufName(*args, **kwargs)
+ elif key == 'mode':
+ return _WithMode(*args, **kwargs)
+ elif key == 'bufoptions':
+ return _WithBufOption(**kwargs)
+ elif key == 'options':
+ return _WithDict(options, **kwargs)
+ elif key == 'globals':
+ return _WithDict(vars, **kwargs)
+ elif key == 'wvars':
+ return _WithDict(current.window.vars, **kwargs)
+ elif key == 'environ':
+ return _WithDict(_environ, **kwargs)
+ elif key == 'split':
+ return _WithSplit()
+ elif key == 'tabpage':
+ return _WithNewTabPage(*args, **kwargs)
+ elif key == 'vpos':
+ return _WithGlobal(_v_pos=[0, kwargs['line'], kwargs['col'], kwargs['off']])
+
+
+class error(Exception):
+ pass