diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-06 02:04:06 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-06 02:04:06 +0000 |
commit | a8637fe80c24fb04bf6cc8ae8459877535f1341b (patch) | |
tree | 5d263b4543e10940f5e9a79a8fe981c5a3414bd7 /tests/test_python | |
parent | Initial commit. (diff) | |
download | powerline-a8637fe80c24fb04bf6cc8ae8459877535f1341b.tar.xz powerline-a8637fe80c24fb04bf6cc8ae8459877535f1341b.zip |
Adding upstream version 2.7.upstream/2.7upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tests/test_python')
-rw-r--r-- | tests/test_python/empty | 0 | ||||
-rwxr-xr-x | tests/test_python/test.sh | 13 | ||||
-rw-r--r-- | tests/test_python/test_cmdline.py | 149 | ||||
-rw-r--r-- | tests/test_python/test_config_merging.py | 270 | ||||
-rw-r--r-- | tests/test_python/test_config_reload.py | 319 | ||||
-rw-r--r-- | tests/test_python/test_configuration.py | 877 | ||||
-rw-r--r-- | tests/test_python/test_lib.py | 733 | ||||
-rw-r--r-- | tests/test_python/test_lib_config.py | 52 | ||||
-rw-r--r-- | tests/test_python/test_listers.py | 227 | ||||
-rw-r--r-- | tests/test_python/test_logging.py | 467 | ||||
-rw-r--r-- | tests/test_python/test_provided_config_files.py | 201 | ||||
-rw-r--r-- | tests/test_python/test_segments.py | 1711 | ||||
-rw-r--r-- | tests/test_python/test_selectors.py | 36 | ||||
-rw-r--r-- | tests/test_python/test_watcher.py | 245 |
14 files changed, 5300 insertions, 0 deletions
diff --git a/tests/test_python/empty b/tests/test_python/empty new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/test_python/empty diff --git a/tests/test_python/test.sh b/tests/test_python/test.sh new file mode 100755 index 0000000..f042237 --- /dev/null +++ b/tests/test_python/test.sh @@ -0,0 +1,13 @@ +#!/bin/sh +. tests/shlib/common.sh + +enter_suite python final + +for file in "$ROOT"/tests/test_python/test_*.py ; do + test_name="${file##*/test_}" + if ! "$PYTHON" "$file" --verbose --catch ; then + fail "${test_name%.py}" F "Failed test(s) from $file" + fi +done + +exit_suite diff --git a/tests/test_python/test_cmdline.py b/tests/test_python/test_cmdline.py new file mode 100644 index 0000000..470a7b4 --- /dev/null +++ b/tests/test_python/test_cmdline.py @@ -0,0 +1,149 @@ +# vim:fileencoding=utf-8:noet + +'''Tests for shell.py parser''' + +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import sys + +if sys.version_info < (3,): + from io import BytesIO as StrIO +else: + from io import StringIO as StrIO + +from powerline.commands.main import get_argparser, finish_args + +from tests.modules import TestCase +from tests.modules.lib import replace_attr + + +class TestParser(TestCase): + def test_main_err(self): + parser = get_argparser() + out = StrIO() + err = StrIO() + + def flush(): + out.truncate(0) + err.truncate(0) + + with replace_attr(sys, 'stdout', out, 'stderr', err): + for raising_args, raising_reg in [ + ([], 'too few arguments|the following arguments are required: ext'), + (['-r'], 'expected one argument'), + (['shell', '-r'], 'expected one argument'), + (['shell', '-w'], 'expected one argument'), + (['shell', '-c'], 'expected one argument'), + (['shell', '-t'], 'expected one argument'), + (['shell', '-p'], 'expected one argument'), + (['shell', '-R'], 'expected one argument'), + (['shell', '--renderer-module'], 'expected one argument'), + (['shell', '--width'], 'expected one argument'), + (['shell', '--last-exit-code'], 'expected one argument'), + (['shell', '--last-pipe-status'], 'expected one argument'), + (['shell', '--config-override'], 'expected one argument'), + (['shell', '--theme-override'], 'expected one argument'), + (['shell', '--config-path'], 'expected one argument'), + (['shell', '--renderer-arg'], 'expected one argument'), + (['shell', '--jobnum'], 'expected one argument'), + (['-r', '.zsh'], 'too few arguments|the following arguments are required: ext'), + (['shell', '--last-exit-code', 'i'], 'invalid int_or_sig value'), + (['shell', '--last-pipe-status', '1 i'], 'invalid <lambda> value'), + ]: + self.assertRaises(SystemExit, parser.parse_args, raising_args) + self.assertFalse(out.getvalue()) + self.assertRegexpMatches(err.getvalue(), raising_reg) + flush() + + def test_main_normal(self): + parser = get_argparser() + out = StrIO() + err = StrIO() + with replace_attr(sys, 'stdout', out, 'stderr', err): + for argv, expargs in [ + (['shell', 'left'], {'ext': ['shell'], 'side': 'left'}), + (['shell', 'left', '-r', '.zsh'], {'ext': ['shell'], 'renderer_module': '.zsh', 'side': 'left'}), + ([ + 'shell', + 'left', + '-r', '.zsh', + '--last-exit-code', '10', + '--last-pipe-status', '10 20 30', + '--jobnum=10', + '-w', '100', + '-c', 'common.term_truecolor=true', + '-c', 'common.spaces=4', + '-t', 'default.segment_data.hostname.before=H:', + '-p', '.', + '-p', '..', + '-R', 'smth={"abc":"def"}', + ], { + 'ext': ['shell'], + 'side': 'left', + 'renderer_module': '.zsh', + 'last_exit_code': 10, + 'last_pipe_status': [10, 20, 30], + 'jobnum': 10, + 'width': 100, + 'config_override': {'common': {'term_truecolor': True, 'spaces': 4}}, + 'theme_override': { + 'default': { + 'segment_data': { + 'hostname': { + 'before': 'H:' + } + } + } + }, + 'config_path': ['.', '..'], + 'renderer_arg': {'smth': {'abc': 'def'}}, + }), + (['shell', 'left', '-R', 'arg=true'], { + 'ext': ['shell'], + 'side': 'left', + 'renderer_arg': {'arg': True}, + }), + (['shell', 'left', '-R', 'arg=true', '-R', 'arg='], { + 'ext': ['shell'], + 'side': 'left', + 'renderer_arg': {}, + }), + (['shell', 'left', '-R', 'arg='], {'ext': ['shell'], 'renderer_arg': {}, 'side': 'left'}), + (['shell', 'left', '-t', 'default.segment_info={"hostname": {}}'], { + 'ext': ['shell'], + 'side': 'left', + 'theme_override': { + 'default': { + 'segment_info': { + 'hostname': {} + } + } + }, + }), + (['shell', 'left', '-c', 'common={ }'], { + 'ext': ['shell'], + 'side': 'left', + 'config_override': {'common': {}}, + }), + (['shell', 'left', '--last-pipe-status='], { + 'ext': ['shell'], + 'side': 'left', + 'last_pipe_status': [], + }), + ]: + args = parser.parse_args(argv) + finish_args(parser, {}, args) + for key, val in expargs.items(): + self.assertEqual(getattr(args, key), val) + for key, val in args.__dict__.items(): + if key not in expargs: + self.assertFalse(val, msg='key {0} is {1} while it should be something false'.format(key, val)) + self.assertFalse(err.getvalue() + out.getvalue(), msg='unexpected output: {0!r} {1!r}'.format( + err.getvalue(), + out.getvalue(), + )) + + +if __name__ == '__main__': + from tests.modules import main + main() diff --git a/tests/test_python/test_config_merging.py b/tests/test_python/test_config_merging.py new file mode 100644 index 0000000..3f4fa2a --- /dev/null +++ b/tests/test_python/test_config_merging.py @@ -0,0 +1,270 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import os +import json + +from subprocess import check_call +from operator import add +from shutil import rmtree + +from powerline.lib.dict import mergedicts_copy as mdc +from powerline import Powerline + +from tests.modules import TestCase +from tests.modules.lib.config_mock import select_renderer, UT + + +CONFIG_DIR = 'tests/config' + + +root_config = lambda: { + 'common': { + 'interval': None, + 'watcher': 'auto', + }, + 'ext': { + 'test': { + 'theme': 'default', + 'colorscheme': 'default', + }, + }, +} + + +colors_config = lambda: { + 'colors': { + 'c1': 1, + 'c2': 2, + }, + 'gradients': { + }, +} + + +colorscheme_config = lambda: { + 'groups': { + 'g': {'fg': 'c1', 'bg': 'c2', 'attrs': []}, + } +} + + +theme_config = lambda: { + 'segment_data': { + 's': { + 'before': 'b', + }, + }, + 'segments': { + 'left': [ + { + 'type': 'string', + 'name': 's', + 'contents': 't', + 'highlight_groups': ['g'], + }, + ], + 'right': [], + } +} + +top_theme_config = lambda: { + 'dividers': { + 'left': { + 'hard': '#>', + 'soft': '|>', + }, + 'right': { + 'hard': '<#', + 'soft': '<|', + }, + }, + 'spaces': 0, +} + + +main_tree = lambda: { + '1/config': root_config(), + '1/colors': colors_config(), + '1/colorschemes/default': colorscheme_config(), + '1/themes/test/default': theme_config(), + '1/themes/' + UT: top_theme_config(), + '1/themes/other1': mdc(top_theme_config(), { + 'dividers': { + 'left': { + 'hard': '!>', + } + } + }), + '1/themes/other2': mdc(top_theme_config(), { + 'dividers': { + 'left': { + 'hard': '>>', + } + } + }), +} + + +def mkdir_recursive(directory): + if os.path.isdir(directory): + return + mkdir_recursive(os.path.dirname(directory)) + os.mkdir(directory) + + +class TestPowerline(Powerline): + def get_config_paths(self): + return tuple(sorted([ + os.path.join(CONFIG_DIR, d) + for d in os.listdir(CONFIG_DIR) + ])) + + +class WithConfigTree(object): + __slots__ = ('tree', 'p', 'p_kwargs') + + def __init__(self, tree, p_kwargs={'run_once': True}): + self.tree = tree + self.p = None + self.p_kwargs = p_kwargs + + def __enter__(self, *args): + os.mkdir(CONFIG_DIR) + for k, v in self.tree.items(): + fname = os.path.join(CONFIG_DIR, k) + '.json' + mkdir_recursive(os.path.dirname(fname)) + with open(fname, 'w') as F: + json.dump(v, F) + select_renderer(simpler_renderer=True) + self.p = TestPowerline( + ext='test', + renderer_module='tests.modules.lib.config_mock', + **self.p_kwargs + ) + if os.environ.get('POWERLINE_RUN_LINT_DURING_TESTS'): + try: + check_call(['scripts/powerline-lint'] + reduce(add, ( + ['-p', d] for d in self.p.get_config_paths() + ))) + except: + self.__exit__() + raise + return self.p.__enter__(*args) + + def __exit__(self, *args): + try: + rmtree(CONFIG_DIR) + finally: + if self.p: + self.p.__exit__(*args) + + +class TestMerging(TestCase): + def assertRenderEqual(self, p, output, **kwargs): + self.assertEqual(p.render(**kwargs).replace(' ', ' '), output) + + def test_not_merged_config(self): + with WithConfigTree(main_tree()) as p: + self.assertRenderEqual(p, '{12} bt{2-}#>{--}') + + def test_root_config_merging(self): + with WithConfigTree(mdc(main_tree(), { + '2/config': { + 'common': { + 'default_top_theme': 'other1', + } + }, + })) as p: + self.assertRenderEqual(p, '{12} bt{2-}!>{--}') + with WithConfigTree(mdc(main_tree(), { + '2/config': { + 'common': { + 'default_top_theme': 'other1', + } + }, + '3/config': { + 'common': { + 'default_top_theme': 'other2', + } + }, + })) as p: + self.assertRenderEqual(p, '{12} bt{2-}>>{--}') + + def test_top_theme_merging(self): + with WithConfigTree(mdc(main_tree(), { + '2/themes/' + UT: { + 'spaces': 1, + }, + '3/themes/' + UT: { + 'dividers': { + 'left': { + 'hard': '>>', + } + } + }, + })) as p: + self.assertRenderEqual(p, '{12} bt {2-}>>{--}') + + def test_colors_config_merging(self): + with WithConfigTree(mdc(main_tree(), { + '2/colors': { + 'colors': { + 'c1': 3, + } + }, + })) as p: + self.assertRenderEqual(p, '{32} bt{2-}#>{--}') + with WithConfigTree(mdc(main_tree(), { + '2/colors': { + 'colors': { + 'c1': 3, + } + }, + '3/colors': { + 'colors': { + 'c1': 4, + } + }, + })) as p: + self.assertRenderEqual(p, '{42} bt{2-}#>{--}') + with WithConfigTree(mdc(main_tree(), { + '2/colors': { + 'colors': { + 'c1': 3, + } + }, + '3/colors': { + 'colors': { + 'c2': 4, + } + }, + })) as p: + self.assertRenderEqual(p, '{34} bt{4-}#>{--}') + + def test_colorschemes_merging(self): + with WithConfigTree(mdc(main_tree(), { + '2/colorschemes/default': { + 'groups': { + 'g': {'fg': 'c2', 'bg': 'c1', 'attrs': []}, + } + }, + })) as p: + self.assertRenderEqual(p, '{21} bt{1-}#>{--}') + + def test_theme_merging(self): + with WithConfigTree(mdc(main_tree(), { + '2/themes/test/default': { + 'segment_data': { + 's': { + 'after': 'a', + } + } + }, + })) as p: + self.assertRenderEqual(p, '{12} bta{2-}#>{--}') + + +if __name__ == '__main__': + from tests.modules import main + main() diff --git a/tests/test_python/test_config_reload.py b/tests/test_python/test_config_reload.py new file mode 100644 index 0000000..a418d49 --- /dev/null +++ b/tests/test_python/test_config_reload.py @@ -0,0 +1,319 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +from time import sleep +from copy import deepcopy +from functools import wraps + +from tests.modules import TestCase +from tests.modules.lib.config_mock import get_powerline, add_watcher_events, UT + + +config = { + 'config': { + 'common': { + 'interval': 0, + 'watcher': 'test', + }, + 'ext': { + 'test': { + 'theme': 'default', + 'colorscheme': 'default', + }, + }, + }, + 'colors': { + 'colors': { + "col1": 1, + "col2": 2, + "col3": 3, + "col4": 4, + }, + 'gradients': { + }, + }, + 'colorschemes/test/default': { + 'groups': { + 'str1': {'fg': 'col1', 'bg': 'col2', 'attrs': ['bold']}, + 'str2': {'fg': 'col3', 'bg': 'col4', 'attrs': ['underline']}, + }, + }, + 'colorschemes/test/2': { + 'groups': { + 'str1': {'fg': 'col2', 'bg': 'col3', 'attrs': ['bold']}, + 'str2': {'fg': 'col1', 'bg': 'col4', 'attrs': ['underline']}, + }, + }, + 'themes/test/default': { + 'segments': { + "left": [ + { + "type": "string", + "contents": "s", + "highlight_groups": ["str1"], + }, + { + "type": "string", + "contents": "g", + "highlight_groups": ["str2"], + }, + ], + "right": [ + ], + }, + }, + 'themes/' + UT: { + 'dividers': { + "left": { + "hard": ">>", + "soft": ">", + }, + "right": { + "hard": "<<", + "soft": "<", + }, + }, + 'spaces': 0, + }, + 'themes/other': { + 'dividers': { + "left": { + "hard": ">>", + "soft": ">", + }, + "right": { + "hard": "<<", + "soft": "<", + }, + }, + 'spaces': 1, + }, + 'themes/test/2': { + 'segments': { + "left": [ + { + "type": "string", + "contents": "t", + "highlight_groups": ["str1"], + }, + { + "type": "string", + "contents": "b", + "highlight_groups": ["str2"], + }, + ], + "right": [ + ], + }, + }, +} + + +def with_new_config(func): + @wraps(func) + def f(self): + return func(self, deepcopy(config)) + return f + + +class TestConfigReload(TestCase): + def assertAccessEvents(self, p, *args): + events = set() + for event in args: + if ':' not in event: + events.add('check:' + event) + events.add('load:' + event) + else: + events.add(event) + self.assertEqual(set(p._pop_events()), events) + + @with_new_config + def test_noreload(self, config): + with get_powerline(config, run_once=True) as p: + self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>') + self.assertAccessEvents(p, 'config', 'colors', 'check:colorschemes/default', 'check:colorschemes/test/__main__', 'colorschemes/test/default', 'themes/test/default', 'themes/' + UT, 'check:themes/test/__main__') + config['config']['common']['spaces'] = 1 + add_watcher_events(p, 'config', wait=False, interval=0.05) + # When running once thread should not start + self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>') + self.assertAccessEvents(p) + self.assertEqual(p.logger._pop_msgs(), []) + + @with_new_config + def test_reload_main(self, config): + with get_powerline(config, run_once=False) as p: + self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>') + self.assertAccessEvents(p, 'config', 'colors', 'check:colorschemes/default', 'check:colorschemes/test/__main__', 'colorschemes/test/default', 'themes/test/default', 'themes/' + UT, 'check:themes/test/__main__') + + config['config']['common']['default_top_theme'] = 'other' + add_watcher_events(p, 'config') + p.render() + self.assertEqual(p.render(), '<1 2 1> s <2 4 False>>><3 4 4>g <4 False False>>><None None None>') + self.assertAccessEvents(p, 'config', 'themes/other', 'check:themes/test/__main__', 'themes/test/default') + self.assertEqual(p.logger._pop_msgs(), []) + + config['config']['ext']['test']['theme'] = 'nonexistent' + add_watcher_events(p, 'config') + self.assertEqual(p.render(), '<1 2 1> s <2 4 False>>><3 4 4>g <4 False False>>><None None None>') + self.assertAccessEvents(p, 'config', 'check:themes/test/nonexistent', 'themes/other', 'check:themes/test/__main__') + # It should normally handle file missing error + self.assertEqual(p.logger._pop_msgs(), [ + 'exception:test:powerline:Failed to load theme: themes/test/__main__', + 'exception:test:powerline:Failed to load theme: themes/test/nonexistent', + 'exception:test:powerline:Failed to create renderer: themes/test/nonexistent' + ]) + + config['config']['ext']['test']['theme'] = 'default' + add_watcher_events(p, 'config') + self.assertEqual(p.render(), '<1 2 1> s <2 4 False>>><3 4 4>g <4 False False>>><None None None>') + self.assertAccessEvents(p, 'config', 'themes/test/default', 'themes/other', 'check:themes/test/__main__') + self.assertEqual(p.logger._pop_msgs(), []) + + config['config']['ext']['test']['colorscheme'] = 'nonexistent' + add_watcher_events(p, 'config') + self.assertEqual(p.render(), '<1 2 1> s <2 4 False>>><3 4 4>g <4 False False>>><None None None>') + self.assertAccessEvents(p, 'config', 'check:colorschemes/nonexistent', 'check:colorschemes/test/__main__', 'check:colorschemes/test/nonexistent') + # It should normally handle file missing error + self.assertEqual(p.logger._pop_msgs(), [ + 'exception:test:powerline:Failed to load colorscheme: colorschemes/nonexistent', + 'exception:test:powerline:Failed to load colorscheme: colorschemes/test/__main__', + 'exception:test:powerline:Failed to load colorscheme: colorschemes/test/nonexistent', + 'exception:test:powerline:Failed to create renderer: colorschemes/test/nonexistent' + ]) + + config['config']['ext']['test']['colorscheme'] = '2' + add_watcher_events(p, 'config') + self.assertEqual(p.render(), '<2 3 1> s <3 4 False>>><1 4 4>g <4 False False>>><None None None>') + self.assertAccessEvents(p, 'config', 'check:colorschemes/2', 'check:colorschemes/test/__main__', 'colorschemes/test/2') + self.assertEqual(p.logger._pop_msgs(), []) + + config['config']['ext']['test']['theme'] = '2' + add_watcher_events(p, 'config') + self.assertEqual(p.render(), '<2 3 1> t <3 4 False>>><1 4 4>b <4 False False>>><None None None>') + self.assertAccessEvents(p, 'config', 'themes/test/2', 'themes/other', 'check:themes/test/__main__') + self.assertEqual(p.logger._pop_msgs(), []) + + self.assertEqual(p.renderer.local_themes, None) + config['config']['ext']['test']['local_themes'] = 'something' + add_watcher_events(p, 'config') + self.assertEqual(p.render(), '<2 3 1> t <3 4 False>>><1 4 4>b <4 False False>>><None None None>') + self.assertAccessEvents(p, 'config') + self.assertEqual(p.logger._pop_msgs(), []) + self.assertEqual(p.renderer.local_themes, 'something') + + @with_new_config + def test_reload_unexistent(self, config): + with get_powerline(config, run_once=False) as p: + self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>') + self.assertAccessEvents(p, 'config', 'colors', 'check:colorschemes/default', 'check:colorschemes/test/__main__', 'colorschemes/test/default', 'themes/test/default', 'themes/' + UT, 'check:themes/test/__main__') + + config['config']['ext']['test']['colorscheme'] = 'nonexistentraise' + add_watcher_events(p, 'config') + # It may appear that p.logger._pop_msgs() is called after given + # exception is added to the mesagges, but before config_loader + # exception was added (this one: + # “exception:test:config_loader:Error while running condition + # function for key colorschemes/test/nonexistentraise: + # fcf:colorschemes/test/nonexistentraise”). + # sleep(0.1) + self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>') + # For colorschemes/{test/,}*raise find_config_file raises + # IOError, but it does not do so for check:colorschemes/test/__main__, + # so powerline is trying to load this, but not other + # colorschemes/* + self.assertAccessEvents(p, 'config', 'check:colorschemes/test/__main__', 'check:colorschemes/nonexistentraise', 'check:colorschemes/test/nonexistentraise') + self.assertIn('exception:test:powerline:Failed to create renderer: fcf:colorschemes/test/nonexistentraise', p.logger._pop_msgs()) + + config['colorschemes/nonexistentraise'] = {} + config['colorschemes/test/nonexistentraise'] = { + 'groups': { + 'str1': {'fg': 'col1', 'bg': 'col3', 'attrs': ['bold']}, + 'str2': {'fg': 'col2', 'bg': 'col4', 'attrs': ['underline']}, + }, + } + while not p._will_create_renderer(): + sleep(0.1) + self.assertEqual(p.render(), '<1 3 1> s<3 4 False>>><2 4 4>g<4 False False>>><None None None>') + # Same as above + self.assertAccessEvents(p, 'colorschemes/nonexistentraise', 'colorschemes/test/nonexistentraise', 'check:colorschemes/test/__main__') + self.assertEqual(p.logger._pop_msgs(), []) + + @with_new_config + def test_reload_colors(self, config): + with get_powerline(config, run_once=False) as p: + self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>') + self.assertAccessEvents(p, 'config', 'colors', 'check:colorschemes/default', 'check:colorschemes/test/__main__', 'colorschemes/test/default', 'themes/test/default', 'themes/' + UT, 'check:themes/test/__main__') + + config['colors']['colors']['col1'] = 5 + add_watcher_events(p, 'colors') + self.assertEqual(p.render(), '<5 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>') + self.assertAccessEvents(p, 'colors') + self.assertEqual(p.logger._pop_msgs(), []) + + @with_new_config + def test_reload_colorscheme(self, config): + with get_powerline(config, run_once=False) as p: + self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>') + self.assertAccessEvents(p, 'config', 'colors', 'check:colorschemes/default', 'check:colorschemes/test/__main__', 'colorschemes/test/default', 'themes/test/default', 'themes/' + UT, 'check:themes/test/__main__') + + config['colorschemes/test/default']['groups']['str1']['bg'] = 'col3' + add_watcher_events(p, 'colorschemes/test/default') + self.assertEqual(p.render(), '<1 3 1> s<3 4 False>>><3 4 4>g<4 False False>>><None None None>') + self.assertAccessEvents(p, 'check:colorschemes/default', 'check:colorschemes/test/__main__', 'colorschemes/test/default') + self.assertEqual(p.logger._pop_msgs(), []) + + @with_new_config + def test_reload_theme(self, config): + with get_powerline(config, run_once=False) as p: + self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>') + self.assertAccessEvents(p, 'config', 'colors', 'check:colorschemes/default', 'check:colorschemes/test/__main__', 'colorschemes/test/default', 'themes/test/default', 'themes/' + UT, 'check:themes/test/__main__') + + config['themes/test/default']['segments']['left'][0]['contents'] = 'col3' + add_watcher_events(p, 'themes/test/default') + self.assertEqual(p.render(), '<1 2 1> col3<2 4 False>>><3 4 4>g<4 False False>>><None None None>') + self.assertAccessEvents(p, 'themes/test/default', 'themes/' + UT, 'check:themes/test/__main__') + self.assertEqual(p.logger._pop_msgs(), []) + + @with_new_config + def test_reload_top_theme(self, config): + with get_powerline(config, run_once=False) as p: + self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>') + self.assertAccessEvents(p, 'config', 'colors', 'check:colorschemes/default', 'check:colorschemes/test/__main__', 'colorschemes/test/default', 'themes/test/default', 'themes/' + UT, 'check:themes/test/__main__') + + config['themes/' + UT]['dividers']['left']['hard'] = '|>' + add_watcher_events(p, 'themes/' + UT) + self.assertEqual(p.render(), '<1 2 1> s<2 4 False>|><3 4 4>g<4 False False>|><None None None>') + self.assertAccessEvents(p, 'themes/test/default', 'themes/' + UT, 'check:themes/test/__main__') + self.assertEqual(p.logger._pop_msgs(), []) + + @with_new_config + def test_reload_theme_main(self, config): + config['config']['common']['interval'] = None + with get_powerline(config, run_once=False) as p: + self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>') + self.assertAccessEvents(p, 'config', 'colors', 'check:colorschemes/default', 'check:colorschemes/test/__main__', 'colorschemes/test/default', 'themes/test/default', 'themes/' + UT, 'check:themes/test/__main__') + + config['themes/test/default']['segments']['left'][0]['contents'] = 'col3' + add_watcher_events(p, 'themes/test/default', wait=False) + self.assertEqual(p.render(), '<1 2 1> col3<2 4 False>>><3 4 4>g<4 False False>>><None None None>') + self.assertAccessEvents(p, 'themes/test/default', 'themes/' + UT, 'check:themes/test/__main__') + self.assertEqual(p.logger._pop_msgs(), []) + self.assertTrue(p._watcher._calls) + + @with_new_config + def test_run_once_no_theme_reload(self, config): + config['config']['common']['interval'] = None + with get_powerline(config, run_once=True) as p: + self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>') + self.assertAccessEvents(p, 'config', 'colors', 'check:colorschemes/default', 'check:colorschemes/test/__main__', 'colorschemes/test/default', 'themes/test/default', 'themes/' + UT, 'check:themes/test/__main__') + + config['themes/test/default']['segments']['left'][0]['contents'] = 'col3' + add_watcher_events(p, 'themes/test/default', wait=False) + self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>') + self.assertAccessEvents(p) + self.assertEqual(p.logger._pop_msgs(), []) + + +if __name__ == '__main__': + from tests.modules import main + main() diff --git a/tests/test_python/test_configuration.py b/tests/test_python/test_configuration.py new file mode 100644 index 0000000..aa9e844 --- /dev/null +++ b/tests/test_python/test_configuration.py @@ -0,0 +1,877 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import sys +import os + +from functools import wraps +from copy import deepcopy + +import tests.modules.vim as vim_module + +from tests.modules import TestCase +from tests.modules.lib.config_mock import (get_powerline, get_powerline_raw, + swap_attributes, UT) +from tests.modules.lib import Args, replace_item + + +def highlighted_string(s, group, **kwargs): + ret = { + 'type': 'string', + 'contents': s, + 'highlight_groups': [group], + } + ret.update(kwargs) + return ret + + +config = { + 'config': { + 'common': { + 'interval': 0, + 'watcher': 'test', + }, + 'ext': { + 'test': { + 'theme': 'default', + 'colorscheme': 'default', + }, + 'vim': { + 'theme': 'default', + 'colorscheme': 'default', + }, + 'shell': { + 'theme': 'default', + 'colorscheme': 'default', + }, + 'wm': { + 'theme': 'default', + 'colorscheme': 'default', + }, + }, + }, + 'colors': { + 'colors': { + 'col1': 1, + 'col2': 2, + 'col3': 3, + 'col4': 4, + 'col5': 5, + 'col6': 6, + 'col7': 7, + 'col8': 8, + 'col9': 9, + 'col10': 10, + 'col11': 11, + 'col12': 12, + }, + 'gradients': { + }, + }, + 'colorschemes/test/__main__': { + 'groups': { + 'm1': 'g1', + 'm2': 'g3', + 'm3': {'fg': 'col11', 'bg': 'col12', 'attrs': []}, + } + }, + 'colorschemes/default': { + 'groups': { + 'g1': {'fg': 'col5', 'bg': 'col6', 'attrs': []}, + 'g2': {'fg': 'col7', 'bg': 'col8', 'attrs': []}, + 'g3': {'fg': 'col9', 'bg': 'col10', 'attrs': []}, + } + }, + 'colorschemes/test/default': { + 'groups': { + 'str1': {'fg': 'col1', 'bg': 'col2', 'attrs': ['bold']}, + 'str2': {'fg': 'col3', 'bg': 'col4', 'attrs': ['underline']}, + 'd1': 'g2', + 'd2': 'm2', + 'd3': 'm3', + }, + }, + 'colorschemes/vim/default': { + 'groups': { + 'environment': {'fg': 'col3', 'bg': 'col4', 'attrs': ['underline']}, + }, + }, + 'colorschemes/wm/default': { + 'groups': { + 'hl1': {'fg': 'col1', 'bg': 'col2', 'attrs': ['underline']}, + 'hl2': {'fg': 'col2', 'bg': 'col1', 'attrs': []}, + 'hl3': {'fg': 'col3', 'bg': 'col1', 'attrs': ['underline']}, + 'hl4': {'fg': 'col2', 'bg': 'col4', 'attrs': []}, + }, + }, + 'themes/test/default': { + 'segments': { + 'left': [ + highlighted_string('s', 'str1', width='auto'), + highlighted_string('g', 'str2'), + ], + 'right': [ + highlighted_string('f', 'str2', width='auto', align='r'), + ], + }, + }, + 'themes/' + UT: { + 'dividers': { + 'left': { + 'hard': '>>', + 'soft': '>', + }, + 'right': { + 'hard': '<<', + 'soft': '<', + }, + }, + 'spaces': 0, + }, + 'themes/test/__main__': { + 'dividers': { + 'right': { + 'soft': '|', + }, + }, + }, + 'themes/vim/default': { + 'segments': { + 'left': [ + { + 'function': 'powerline.segments.common.env.environment', + 'args': { + 'variable': 'TEST', + }, + }, + ], + }, + }, + 'themes/shell/default': { + 'default_module': 'powerline.segments.common', + 'segments': { + 'left': [ + highlighted_string('s', 'g1', width='auto'), + ], + }, + }, + 'themes/wm/default': { + 'default_module': 'powerline.segments.common', + 'segments': { + 'left': [ + highlighted_string('A', 'hl1'), + highlighted_string('B', 'hl2'), + ], + 'right': [ + highlighted_string('C', 'hl3'), + highlighted_string('D', 'hl4'), + ], + }, + }, +} + + +def with_new_config(func): + @wraps(func) + def f(self): + return func(self, deepcopy(config)) + return f + + +def add_args(func): + @wraps(func) + def f(self): + new_config = deepcopy(config) + with get_powerline(new_config, run_once=True, simpler_renderer=True) as p: + func(self, p, new_config) + return f + + +class TestRender(TestCase): + def assertRenderEqual(self, p, output, **kwargs): + self.assertEqual(p.render(**kwargs).replace(' ', ' '), output) + + def assertRenderLinesEqual(self, p, output, **kwargs): + self.assertEqual([l.replace(' ', ' ') for l in p.render_above_lines(**kwargs)], output) + + +class TestLines(TestRender): + @add_args + def test_without_above(self, p, config): + self.assertRenderEqual(p, '{121} s{24}>>{344}g{34}>{34}|{344}f {--}') + self.assertRenderEqual(p, '{121} s {24}>>{344}g{34}>{34}|{344}f {--}', width=10) + # self.assertRenderEqual(p, '{121} s {24}>>{344}g{34}>{34}|{344} f {--}', width=11) + self.assertEqual(list(p.render_above_lines()), []) + + @with_new_config + def test_with_above(self, config): + old_segments = deepcopy(config['themes/test/default']['segments']) + config['themes/test/default']['segments']['above'] = [old_segments] + with get_powerline(config, run_once=True, simpler_renderer=True) as p: + self.assertRenderLinesEqual(p, [ + '{121} s{24}>>{344}g{34}>{34}|{344}f {--}', + ]) + self.assertRenderLinesEqual(p, [ + '{121} s {24}>>{344}g{34}>{34}|{344}f {--}', + ], width=10) + + config['themes/test/default']['segments']['above'] = [old_segments] * 2 + with get_powerline(config, run_once=True, simpler_renderer=True) as p: + self.assertRenderLinesEqual(p, [ + '{121} s{24}>>{344}g{34}>{34}|{344}f {--}', + '{121} s{24}>>{344}g{34}>{34}|{344}f {--}', + ]) + self.assertRenderLinesEqual(p, [ + '{121} s {24}>>{344}g{34}>{34}|{344}f {--}', + '{121} s {24}>>{344}g{34}>{34}|{344}f {--}', + ], width=10) + + +class TestSegments(TestRender): + @add_args + def test_display(self, p, config): + config['themes/test/default']['segments']['left'][0]['display'] = False + config['themes/test/default']['segments']['left'][1]['display'] = True + config['themes/test/default']['segments']['right'][0]['display'] = False + self.assertRenderEqual(p, '{344} g{4-}>>{--}') + + +class TestColorschemesHierarchy(TestRender): + @add_args + def test_group_redirects(self, p, config): + config['themes/test/default']['segments'] = { + 'left': [ + highlighted_string('a', 'd1', draw_hard_divider=False), + highlighted_string('b', 'd2', draw_hard_divider=False), + highlighted_string('c', 'd3', draw_hard_divider=False), + highlighted_string('A', 'm1', draw_hard_divider=False), + highlighted_string('B', 'm2', draw_hard_divider=False), + highlighted_string('C', 'm3', draw_hard_divider=False), + highlighted_string('1', 'g1', draw_hard_divider=False), + highlighted_string('2', 'g2', draw_hard_divider=False), + highlighted_string('3', 'g3', draw_hard_divider=False), + ], + 'right': [], + } + self.assertRenderEqual(p, '{78} a{910}b{1112}c{56}A{910}B{1112}C{56}1{78}2{910}3{--}') + self.assertEqual(p.logger._pop_msgs(), []) + + @add_args + def test_group_redirects_no_main(self, p, config): + del config['colorschemes/test/__main__'] + config['themes/test/default']['segments'] = { + 'left': [ + highlighted_string('a', 'd1', draw_hard_divider=False), + highlighted_string('1', 'g1', draw_hard_divider=False), + highlighted_string('2', 'g2', draw_hard_divider=False), + highlighted_string('3', 'g3', draw_hard_divider=False), + ], + 'right': [], + } + self.assertRenderEqual(p, '{78} a{56}1{78}2{910}3{--}') + self.assertEqual(p.logger._pop_msgs(), []) + + @add_args + def test_group_redirects_no_top_default(self, p, config): + del config['colorschemes/default'] + config['themes/test/default']['segments'] = { + 'left': [ + highlighted_string('c', 'd3', draw_soft_divider=False), + highlighted_string('C', 'm3', draw_hard_divider=False), + ], + 'right': [], + } + self.assertRenderEqual(p, '{1112} c{1112}C{--}') + self.assertEqual(p.logger._pop_msgs(), []) + + @add_args + def test_group_redirects_no_test_default(self, p, config): + del config['colorschemes/test/default'] + config['themes/test/default']['segments'] = { + 'left': [ + highlighted_string('A', 'm1', draw_hard_divider=False), + highlighted_string('B', 'm2', draw_hard_divider=False), + highlighted_string('C', 'm3', draw_hard_divider=False), + highlighted_string('1', 'g1', draw_hard_divider=False), + highlighted_string('2', 'g2', draw_hard_divider=False), + highlighted_string('3', 'g3', draw_hard_divider=False), + ], + 'right': [], + } + self.assertRenderEqual(p, '{56} A{910}B{1112}C{56}1{78}2{910}3{--}') + self.assertEqual(p.logger._pop_msgs(), []) + + @add_args + def test_group_redirects_only_main(self, p, config): + del config['colorschemes/default'] + del config['colorschemes/test/default'] + config['themes/test/default']['segments'] = { + 'left': [ + highlighted_string('C', 'm3', draw_hard_divider=False), + ], + 'right': [], + } + # Powerline is not able to work without default colorscheme + # somewhere, thus it will output error string + self.assertRenderEqual(p, 'colorschemes/test/default') + self.assertEqual(p.logger._pop_msgs(), [ + 'exception:test:powerline:Failed to load colorscheme: colorschemes/default', + 'exception:test:powerline:Failed to load colorscheme: colorschemes/test/default', + 'exception:test:powerline:Failed to create renderer: colorschemes/test/default', + 'exception:test:powerline:Failed to render: colorschemes/test/default', + ]) + + @add_args + def test_group_redirects_only_top_default(self, p, config): + del config['colorschemes/test/__main__'] + del config['colorschemes/test/default'] + config['themes/test/default']['segments'] = { + 'left': [ + highlighted_string('1', 'g1', draw_hard_divider=False), + highlighted_string('2', 'g2', draw_hard_divider=False), + highlighted_string('3', 'g3', draw_hard_divider=False), + ], + 'right': [], + } + self.assertRenderEqual(p, '{56} 1{78}2{910}3{--}') + self.assertEqual(p.logger._pop_msgs(), []) + + @add_args + def test_group_redirects_only_test_default(self, p, config): + del config['colorschemes/default'] + del config['colorschemes/test/__main__'] + config['themes/test/default']['segments'] = { + 'left': [ + highlighted_string('s', 'str1', draw_hard_divider=False), + ], + 'right': [], + } + self.assertRenderEqual(p, '{121} s{--}') + self.assertEqual(p.logger._pop_msgs(), []) + + +class TestThemeHierarchy(TestRender): + @add_args + def test_hierarchy(self, p, config): + self.assertRenderEqual(p, '{121} s{24}>>{344}g{34}>{34}|{344}f {--}') + + @add_args + def test_no_main(self, p, config): + del config['themes/test/__main__'] + self.assertRenderEqual(p, '{121} s{24}>>{344}g{34}>{34}<{344}f {--}') + self.assertEqual(p.logger._pop_msgs(), []) + + @add_args + def test_no_powerline(self, p, config): + config['themes/test/__main__']['dividers'] = config['themes/' + UT]['dividers'] + config['themes/test/__main__']['spaces'] = 1 + del config['themes/' + UT] + self.assertRenderEqual(p, '{121} s {24}>>{344}g {34}>{34}<{344} f {--}') + self.assertEqual(p.logger._pop_msgs(), []) + + @add_args + def test_no_default(self, p, config): + del config['themes/test/default'] + self.assertRenderEqual(p, 'themes/test/default') + self.assertEqual(p.logger._pop_msgs(), [ + 'exception:test:powerline:Failed to load theme: themes/test/default', + 'exception:test:powerline:Failed to create renderer: themes/test/default', + 'exception:test:powerline:Failed to render: themes/test/default', + ]) + + @add_args + def test_only_default(self, p, config): + config['themes/test/default']['dividers'] = config['themes/' + UT]['dividers'] + config['themes/test/default']['spaces'] = 1 + del config['themes/test/__main__'] + del config['themes/' + UT] + self.assertRenderEqual(p, '{121} s {24}>>{344}g {34}>{34}<{344} f {--}') + + @add_args + def test_only_main(self, p, config): + del config['themes/test/default'] + del config['themes/' + UT] + self.assertRenderEqual(p, 'themes/test/default') + self.assertEqual(p.logger._pop_msgs(), [ + 'exception:test:powerline:Failed to load theme: themes/' + UT, + 'exception:test:powerline:Failed to load theme: themes/test/default', + 'exception:test:powerline:Failed to create renderer: themes/test/default', + 'exception:test:powerline:Failed to render: themes/test/default', + ]) + + @add_args + def test_only_powerline(self, p, config): + del config['themes/test/default'] + del config['themes/test/__main__'] + self.assertRenderEqual(p, 'themes/test/default') + self.assertEqual(p.logger._pop_msgs(), [ + 'exception:test:powerline:Failed to load theme: themes/test/__main__', + 'exception:test:powerline:Failed to load theme: themes/test/default', + 'exception:test:powerline:Failed to create renderer: themes/test/default', + 'exception:test:powerline:Failed to render: themes/test/default', + ]) + + @add_args + def test_nothing(self, p, config): + del config['themes/test/default'] + del config['themes/' + UT] + del config['themes/test/__main__'] + self.assertRenderEqual(p, 'themes/test/default') + self.assertEqual(p.logger._pop_msgs(), [ + 'exception:test:powerline:Failed to load theme: themes/' + UT, + 'exception:test:powerline:Failed to load theme: themes/test/__main__', + 'exception:test:powerline:Failed to load theme: themes/test/default', + 'exception:test:powerline:Failed to create renderer: themes/test/default', + 'exception:test:powerline:Failed to render: themes/test/default', + ]) + + +class TestDisplayCondition(TestRender): + @add_args + def test_include_modes(self, p, config): + config['themes/test/default']['segments'] = { + 'left': [ + highlighted_string('s1', 'g1', include_modes=['m1']), + highlighted_string('s2', 'g1', include_modes=['m1', 'm2']), + highlighted_string('s3', 'g1', include_modes=['m3']), + ] + } + self.assertRenderEqual(p, '{--}') + self.assertRenderEqual(p, '{56} s1{56}>{56}s2{6-}>>{--}', mode='m1') + self.assertRenderEqual(p, '{56} s2{6-}>>{--}', mode='m2') + self.assertRenderEqual(p, '{56} s3{6-}>>{--}', mode='m3') + + @add_args + def test_exclude_modes(self, p, config): + config['themes/test/default']['segments'] = { + 'left': [ + highlighted_string('s1', 'g1', exclude_modes=['m1']), + highlighted_string('s2', 'g1', exclude_modes=['m1', 'm2']), + highlighted_string('s3', 'g1', exclude_modes=['m3']), + ] + } + self.assertRenderEqual(p, '{56} s1{56}>{56}s2{56}>{56}s3{6-}>>{--}') + self.assertRenderEqual(p, '{56} s3{6-}>>{--}', mode='m1') + self.assertRenderEqual(p, '{56} s1{56}>{56}s3{6-}>>{--}', mode='m2') + self.assertRenderEqual(p, '{56} s1{56}>{56}s2{6-}>>{--}', mode='m3') + + @add_args + def test_exinclude_modes(self, p, config): + config['themes/test/default']['segments'] = { + 'left': [ + highlighted_string('s1', 'g1', exclude_modes=['m1'], include_modes=['m2']), + highlighted_string('s2', 'g1', exclude_modes=['m1', 'm2'], include_modes=['m3']), + highlighted_string('s3', 'g1', exclude_modes=['m3'], include_modes=['m3']), + ] + } + self.assertRenderEqual(p, '{--}') + self.assertRenderEqual(p, '{--}', mode='m1') + self.assertRenderEqual(p, '{56} s1{6-}>>{--}', mode='m2') + self.assertRenderEqual(p, '{56} s2{6-}>>{--}', mode='m3') + + @add_args + def test_exinclude_function_nonexistent_module(self, p, config): + config['themes/test/default']['segments'] = { + 'left': [ + highlighted_string('s1', 'g1', exclude_function='xxx_nonexistent_module.foo'), + highlighted_string('s2', 'g1', exclude_function='xxx_nonexistent_module.foo', include_function='xxx_nonexistent_module.bar'), + highlighted_string('s3', 'g1', include_function='xxx_nonexistent_module.bar'), + ] + } + self.assertRenderEqual(p, '{56} s1{56}>{56}s2{56}>{56}s3{6-}>>{--}') + + @add_args + def test_exinclude_function(self, p, config): + config['themes/test/default']['segments'] = { + 'left': [ + highlighted_string('s1', 'g1', exclude_function='mod.foo'), + highlighted_string('s2', 'g1', exclude_function='mod.foo', include_function='mod.bar'), + highlighted_string('s3', 'g1', include_function='mod.bar'), + ] + } + launched = set() + fool = [None] + barl = [None] + + def foo(*args, **kwargs): + launched.add('foo') + self.assertEqual(set(kwargs.keys()), set(('pl', 'segment_info', 'mode'))) + self.assertEqual(args, ()) + return fool[0] + + def bar(*args, **kwargs): + launched.add('bar') + self.assertEqual(set(kwargs.keys()), set(('pl', 'segment_info', 'mode'))) + self.assertEqual(args, ()) + return barl[0] + + with replace_item(sys.modules, 'mod', Args(foo=foo, bar=bar)): + fool[0] = True + barl[0] = True + self.assertRenderEqual(p, '{56} s3{6-}>>{--}') + self.assertEqual(launched, set(('foo', 'bar'))) + + fool[0] = False + barl[0] = True + self.assertRenderEqual(p, '{56} s1{56}>{56}s2{56}>{56}s3{6-}>>{--}') + self.assertEqual(launched, set(('foo', 'bar'))) + + fool[0] = False + barl[0] = False + self.assertRenderEqual(p, '{56} s1{6-}>>{--}') + self.assertEqual(launched, set(('foo', 'bar'))) + + fool[0] = True + barl[0] = False + self.assertRenderEqual(p, '{--}') + self.assertEqual(launched, set(('foo', 'bar'))) + + @add_args + def test_exinclude_modes_override_functions(self, p, config): + config['themes/test/default']['segments'] = { + 'left': [ + highlighted_string('s1', 'g1', exclude_function='mod.foo', exclude_modes=['m2']), + highlighted_string('s2', 'g1', exclude_function='mod.foo', include_modes=['m2']), + highlighted_string('s3', 'g1', include_function='mod.foo', exclude_modes=['m2']), + highlighted_string('s4', 'g1', include_function='mod.foo', include_modes=['m2']), + ] + } + fool = [None] + + def foo(*args, **kwargs): + return fool[0] + + with replace_item(sys.modules, 'mod', Args(foo=foo)): + fool[0] = True + self.assertRenderEqual(p, '{56} s4{6-}>>{--}', mode='m2') + self.assertRenderEqual(p, '{56} s3{56}>{56}s4{6-}>>{--}', mode='m1') + + fool[0] = False + self.assertRenderEqual(p, '{56} s2{56}>{56}s4{6-}>>{--}', mode='m2') + self.assertRenderEqual(p, '{56} s1{6-}>>{--}', mode='m1') + + +class TestOuterPadding(TestRender): + @add_args + def test_outer_padding_left(self, p, config): + config['themes/' + UT]['outer_padding'] = 5 + self.assertRenderEqual(p, '{121} s{24}>>{344}g{4-}>>{--}', side='left') + + @add_args + def test_outer_padding_right(self, p, config): + config['themes/' + UT]['outer_padding'] = 5 + self.assertRenderEqual(p, '{4-}<<{344}f {--}', side='right') + + @add_args + def test_outer_padding_ten(self, p, config): + config['themes/' + UT]['outer_padding'] = 10 + self.assertRenderEqual(p, '{121} s {24}>>{344}g{34}>{34}|{344} f {--}', width=30) + + @add_args + def test_outer_padding_zero(self, p, config): + config['themes/' + UT]['outer_padding'] = 0 + self.assertRenderEqual(p, '{121}s {24}>>{344}g{34}>{34}|{344} f{--}', width=30) + + +class TestSegmentAttributes(TestRender): + @add_args + def test_no_attributes(self, p, config): + def m1(divider=',', **kwargs): + return divider.join(kwargs.keys()) + divider + config['themes/test/default']['segments'] = { + 'left': [ + { + 'function': 'bar.m1' + } + ] + } + with replace_item(sys.modules, 'bar', Args(m1=m1)): + self.assertRenderEqual(p, '{56} pl,{6-}>>{--}') + + @add_args + def test_segment_datas(self, p, config): + def m1(divider=',', **kwargs): + return divider.join(kwargs.keys()) + divider + m1.powerline_segment_datas = { + UT: { + 'args': { + 'divider': ';' + } + }, + 'ascii': { + 'args': { + 'divider': '--' + } + } + } + config['themes/test/default']['segments'] = { + 'left': [ + { + 'function': 'bar.m1' + } + ] + } + with replace_item(sys.modules, 'bar', Args(m1=m1)): + self.assertRenderEqual(p, '{56} pl;{6-}>>{--}') + + @add_args + def test_expand(self, p, config): + def m1(divider=',', **kwargs): + return divider.join(kwargs.keys()) + divider + + def expand(pl, amount, segment, **kwargs): + return ('-' * amount) + segment['contents'] + + m1.expand = expand + config['themes/test/default']['segments'] = { + 'left': [ + { + 'function': 'bar.m1', + 'width': 'auto' + } + ] + } + with replace_item(sys.modules, 'bar', Args(m1=m1)): + self.assertRenderEqual(p, '{56} ----pl,{6-}>>{--}', width=10) + + @add_args + def test_truncate(self, p, config): + def m1(divider=',', **kwargs): + return divider.join(kwargs.keys()) + divider + + def truncate(pl, amount, segment, **kwargs): + return segment['contents'][:-amount] + + m1.truncate = truncate + config['themes/test/default']['segments'] = { + 'left': [ + { + 'function': 'bar.m1' + } + ] + } + with replace_item(sys.modules, 'bar', Args(m1=m1)): + self.assertRenderEqual(p, '{56} p{6-}>>{--}', width=4) + + +class TestSegmentData(TestRender): + @add_args + def test_segment_data(self, p, config): + def m1(**kwargs): + return 'S' + + def m2(**kwargs): + return 'S' + sys.modules['bar'] = Args(m1=m1, m2=m2) + config['themes/' + UT]['segment_data'] = { + 'm1': { + 'before': '1' + }, + 'bar.m2': { + 'before': '2' + }, + 'n': { + 'before': '3' + }, + 'm2': { + 'before': '4' + }, + } + config['themes/test/default']['segments'] = { + 'left': [ + { + 'function': 'bar.m1' + }, + { + 'function': 'bar.m1', + 'name': 'n' + }, + { + 'function': 'bar.m2', + 'name': 'n' + }, + { + 'function': 'bar.m2' + } + ] + } + self.assertRenderEqual(p, '{56} 1S{56}>{56}3S{610}>>{910}3S{910}>{910}2S{10-}>>{--}') + + +class TestShellEscapes(TestCase): + @with_new_config + def test_escapes(self, config): + from powerline.shell import ShellPowerline + import powerline as powerline_module + with swap_attributes(config, powerline_module): + with get_powerline_raw(config, ShellPowerline, args=Args(config_path=[''])) as powerline: + self.assertEqual(powerline.render(segment_info={}, side='left'), '\x1b[0;38;5;5;48;5;6m\xa0s\x1b[0;38;5;6;49;22m>>\x1b[0m') + + @with_new_config + def test_tmux_escapes(self, config): + from powerline.shell import ShellPowerline + import powerline as powerline_module + config['config']['common']['additional_escapes'] = 'tmux' + with swap_attributes(config, powerline_module): + with get_powerline_raw(config, ShellPowerline, args=Args(config_path=[''])) as powerline: + self.assertEqual(powerline.render(segment_info={}, side='left'), '\x1bPtmux;\x1b\x1b[0;38;5;5;48;5;6m\x1b\\\xa0s\x1bPtmux;\x1b\x1b[0;38;5;6;49;22m\x1b\\>>\x1bPtmux;\x1b\x1b[0m\x1b\\') + + @with_new_config + def test_screen_escapes(self, config): + from powerline.shell import ShellPowerline + import powerline as powerline_module + config['config']['common']['additional_escapes'] = 'screen' + with swap_attributes(config, powerline_module): + with get_powerline_raw(config, ShellPowerline, args=Args(config_path=[''])) as powerline: + self.assertEqual(powerline.render(segment_info={}, side='left'), '\x1bP\x1b\x1b[0;38;5;5;48;5;6m\x1b\\\xa0s\x1bP\x1b\x1b[0;38;5;6;49;22m\x1b\\>>\x1bP\x1b\x1b[0m\x1b\\') + + @with_new_config + def test_fbterm_escapes(self, config): + from powerline.shell import ShellPowerline + import powerline as powerline_module + config['config']['common']['term_escape_style'] = 'fbterm' + with swap_attributes(config, powerline_module): + with get_powerline_raw(config, ShellPowerline, args=Args(config_path=[''])) as powerline: + self.assertEqual(powerline.render(segment_info={}, side='left'), '\x1b[0m\x1b[1;5}\x1b[2;6}\xa0s\x1b[0m\x1b[1;6}\x1b[49m\x1b[22m>>\x1b[0m') + + @with_new_config + def test_fbterm_tmux_escapes(self, config): + from powerline.shell import ShellPowerline + import powerline as powerline_module + config['config']['common']['term_escape_style'] = 'fbterm' + config['config']['common']['additional_escapes'] = 'tmux' + with swap_attributes(config, powerline_module): + with get_powerline_raw(config, ShellPowerline, args=Args(config_path=[''])) as powerline: + self.assertEqual(powerline.render(segment_info={}, side='left'), '\x1bPtmux;\x1b\x1b[0m\x1b\x1b[1;5}\x1b\x1b[2;6}\x1b\\\xa0s\x1bPtmux;\x1b\x1b[0m\x1b\x1b[1;6}\x1b\x1b[49m\x1b\x1b[22m\x1b\\>>\x1bPtmux;\x1b\x1b[0m\x1b\\') + + @with_new_config + def test_fbterm_screen_escapes(self, config): + from powerline.shell import ShellPowerline + import powerline as powerline_module + config['config']['common']['term_escape_style'] = 'fbterm' + config['config']['common']['additional_escapes'] = 'screen' + with swap_attributes(config, powerline_module): + with get_powerline_raw(config, ShellPowerline, args=Args(config_path=[''])) as powerline: + self.assertEqual(powerline.render(segment_info={}, side='left'), '\x1bP\x1b\x1b[0m\x1b\x1b[1;5}\x1b\x1b[2;6}\x1b\\\xa0s\x1bP\x1b\x1b[0m\x1b\x1b[1;6}\x1b\x1b[49m\x1b\x1b[22m\x1b\\>>\x1bP\x1b\x1b[0m\x1b\\') + + @with_new_config + def test_term_truecolor_escapes(self, config): + from powerline.shell import ShellPowerline + import powerline as powerline_module + config['config']['common']['term_truecolor'] = True + with swap_attributes(config, powerline_module): + with get_powerline_raw(config, ShellPowerline, args=Args(config_path=[''])) as powerline: + self.assertEqual(powerline.render(segment_info={}, side='left'), '\x1b[0;38;2;192;0;192;48;2;0;128;128m\xa0s\x1b[0;38;2;0;128;128;49;22m>>\x1b[0m') + + @with_new_config + def test_term_truecolor_fbterm_escapes(self, config): + from powerline.shell import ShellPowerline + import powerline as powerline_module + config['config']['common']['term_escape_style'] = 'fbterm' + config['config']['common']['term_truecolor'] = True + with swap_attributes(config, powerline_module): + with get_powerline_raw(config, ShellPowerline, args=Args(config_path=[''])) as powerline: + self.assertEqual(powerline.render(segment_info={}, side='left'), '\x1b[0m\x1b[1;5}\x1b[2;6}\xa0s\x1b[0m\x1b[1;6}\x1b[49m\x1b[22m>>\x1b[0m') + + @with_new_config + def test_term_truecolor_tmux_escapes(self, config): + from powerline.shell import ShellPowerline + import powerline as powerline_module + config['config']['common']['term_truecolor'] = True + config['config']['common']['additional_escapes'] = 'tmux' + with swap_attributes(config, powerline_module): + with get_powerline_raw(config, ShellPowerline, args=Args(config_path=[''])) as powerline: + self.assertEqual(powerline.render(segment_info={}, side='left'), '\x1bPtmux;\x1b\x1b[0;38;2;192;0;192;48;2;0;128;128m\x1b\\\xa0s\x1bPtmux;\x1b\x1b[0;38;2;0;128;128;49;22m\x1b\\>>\x1bPtmux;\x1b\x1b[0m\x1b\\') + + @with_new_config + def test_term_truecolor_screen_escapes(self, config): + from powerline.shell import ShellPowerline + import powerline as powerline_module + config['config']['common']['term_truecolor'] = True + config['config']['common']['additional_escapes'] = 'screen' + with swap_attributes(config, powerline_module): + with get_powerline_raw(config, ShellPowerline, args=Args(config_path=[''])) as powerline: + self.assertEqual(powerline.render(segment_info={}, side='left'), '\x1bP\x1b\x1b[0;38;2;192;0;192;48;2;0;128;128m\x1b\\\xa0s\x1bP\x1b\x1b[0;38;2;0;128;128;49;22m\x1b\\>>\x1bP\x1b\x1b[0m\x1b\\') + + +class TestVim(TestCase): + def test_environ_update(self): + # Regression test: test that segment obtains environment from vim, not + # from os.environ. + with vim_module._with('globals', powerline_config_paths=['/']): + from powerline.vim import VimPowerline + import powerline as powerline_module + with swap_attributes(config, powerline_module): + with vim_module._with('environ', TEST='abc'): + with get_powerline_raw(config, VimPowerline) as powerline: + window = vim_module.current.window + window_id = 1 + winnr = window.number + self.assertEqual(powerline.render(window, window_id, winnr), b'%#Pl_3_8404992_4_192_underline#\xc2\xa0abc%#Pl_4_192_NONE_None_NONE#>>') + vim_module._environ['TEST'] = 'def' + self.assertEqual(powerline.render(window, window_id, winnr), b'%#Pl_3_8404992_4_192_underline#\xc2\xa0def%#Pl_4_192_NONE_None_NONE#>>') + + def test_local_themes(self): + # Regression test: VimPowerline.add_local_theme did not work properly. + from powerline.vim import VimPowerline + import powerline as powerline_module + with swap_attributes(config, powerline_module): + with get_powerline_raw(config, VimPowerline, replace_gcp=True) as powerline: + powerline.add_local_theme('tests.modules.matchers.always_true', { + 'segment_data': { + 'foo': { + 'contents': '“bar”' + } + }, + 'segments': { + 'left': [ + { + 'type': 'string', + 'name': 'foo', + 'highlight_groups': ['g1'] + } + ] + } + }) + window = vim_module.current.window + window_id = 1 + winnr = window.number + self.assertEqual(powerline.render(window, window_id, winnr), b'%#Pl_5_12583104_6_32896_NONE#\xc2\xa0\xe2\x80\x9cbar\xe2\x80\x9d%#Pl_6_32896_NONE_None_NONE#>>') + + @classmethod + def setUpClass(cls): + sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(os.path.dirname(__file__)), 'vim_sys_path'))) + + @classmethod + def tearDownClass(cls): + sys.path.pop(0) + + +class TestLemonbar(TestRender): + def test_lemonbar(self): + import powerline as powerline_module + with swap_attributes(config, powerline_module): + with get_powerline_raw(config, powerline_module.Powerline, replace_gcp=True, ext='wm', renderer_module='lemonbar') as powerline: + self.assertRenderEqual( + powerline, + '%{l}%{F#ffc00000}%{B#ff008000}%{+u} A%{F-B--u}%{F#ff008000}%{B#ffc00000}>>%{F-B--u}%{F#ff008000}%{B#ffc00000}B%{F-B--u}%{F#ffc00000}>>%{F-B--u}%{r}%{F#ffc00000}<<%{F-B--u}%{F#ff804000}%{B#ffc00000}%{+u}C%{F-B--u}%{F#ff0000c0}%{B#ffc00000}<<%{F-B--u}%{F#ff008000}%{B#ff0000c0}D %{F-B--u}' + ) + + @with_new_config + def test_lemonbar_escape(self, config): + import powerline as powerline_module + config['themes/wm/default']['segments']['left'] = ( + highlighted_string('%{asd}', 'hl1'), + highlighted_string('10% %', 'hl2'), + ) + with swap_attributes(config, powerline_module): + with get_powerline_raw(config, powerline_module.Powerline, replace_gcp=True, ext='wm', renderer_module='lemonbar') as powerline: + self.assertRenderEqual( + powerline, + '%{l}%{F#ffc00000}%{B#ff008000}%{+u} %%{}{asd}%{F-B--u}%{F#ff008000}%{B#ffc00000}>>%{F-B--u}%{F#ff008000}%{B#ffc00000}10%%{} %%{}%{F-B--u}%{F#ffc00000}>>%{F-B--u}%{r}%{F#ffc00000}<<%{F-B--u}%{F#ff804000}%{B#ffc00000}%{+u}C%{F-B--u}%{F#ff0000c0}%{B#ffc00000}<<%{F-B--u}%{F#ff008000}%{B#ff0000c0}D %{F-B--u}' + ) + + +if __name__ == '__main__': + from tests.modules import main + main() diff --git a/tests/test_python/test_lib.py b/tests/test_python/test_lib.py new file mode 100644 index 0000000..6dd6190 --- /dev/null +++ b/tests/test_python/test_lib.py @@ -0,0 +1,733 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import threading +import os +import sys +import re +import shutil +import unicodedata + +from time import sleep +from subprocess import call, PIPE + +from powerline.lib import add_divider_highlight_group +from powerline.lib.dict import mergedicts, REMOVE_THIS_KEY +from powerline.lib.humanize_bytes import humanize_bytes +from powerline.lib.vcs import guess, get_fallback_create_watcher +from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment +from powerline.lib.monotonic import monotonic +from powerline.lib.vcs.git import git_directory +from powerline.lib.shell import run_cmd + +import powerline.lib.unicode as plu + +from tests.modules.lib import Pl, replace_attr +from tests.modules import TestCase, SkipTest + + +try: + __import__('bzrlib') +except ImportError: + use_bzr = False +else: + use_bzr = True + +try: + __import__('hglib') +except ImportError: + use_mercurial = False +else: + use_mercurial = True + + +GIT_REPO = 'git_repo' +HG_REPO = 'hg_repo' +BZR_REPO = 'bzr_repo' + + +def thread_number(): + return len(threading.enumerate()) + + +class TestShell(TestCase): + def test_run_cmd(self): + pl = Pl() + self.assertEqual(run_cmd(pl, ['xxx_nonexistent_command_xxx']), None) + self.assertEqual(len(pl.exceptions), 1) + pl = Pl() + self.assertEqual(run_cmd(pl, ['echo', ' test ']), 'test') + self.assertFalse(pl) + self.assertEqual(run_cmd(pl, ['echo', ' test '], strip=True), 'test') + self.assertFalse(pl) + self.assertEqual(run_cmd(pl, ['echo', ' test '], strip=False), ' test \n') + self.assertFalse(pl) + self.assertEqual(run_cmd(pl, ['cat'], stdin='test'), 'test') + self.assertFalse(pl) + self.assertEqual(run_cmd(pl, ['sh', '-c', 'cat >&2'], stdin='test'), '') + self.assertFalse(pl) + + +class TestThreaded(TestCase): + def test_threaded_segment(self): + log = [] + pl = Pl() + updates = [(None,)] + lock = threading.Lock() + event = threading.Event() + block_event = threading.Event() + + class TestSegment(ThreadedSegment): + interval = 10 + + def set_state(self, **kwargs): + event.clear() + log.append(('set_state', kwargs)) + return super(TestSegment, self).set_state(**kwargs) + + def update(self, update_value): + block_event.wait() + event.set() + # Make sleep first to prevent some race conditions + log.append(('update', update_value)) + with lock: + ret = updates[0] + if isinstance(ret, Exception): + raise ret + else: + return ret[0] + + def render(self, update, **kwargs): + log.append(('render', update, kwargs)) + if isinstance(update, Exception): + raise update + else: + return update + + # Non-threaded tests + segment = TestSegment() + block_event.set() + updates[0] = (None,) + self.assertEqual(segment(pl=pl), None) + self.assertEqual(thread_number(), 1) + self.assertEqual(log, [ + ('set_state', {}), + ('update', None), + ('render', None, {'pl': pl, 'update_first': True}), + ]) + log[:] = () + + segment = TestSegment() + block_event.set() + updates[0] = ('abc',) + self.assertEqual(segment(pl=pl), 'abc') + self.assertEqual(thread_number(), 1) + self.assertEqual(log, [ + ('set_state', {}), + ('update', None), + ('render', 'abc', {'pl': pl, 'update_first': True}), + ]) + log[:] = () + + segment = TestSegment() + block_event.set() + updates[0] = ('abc',) + self.assertEqual(segment(pl=pl, update_first=False), 'abc') + self.assertEqual(thread_number(), 1) + self.assertEqual(log, [ + ('set_state', {}), + ('update', None), + ('render', 'abc', {'pl': pl, 'update_first': False}), + ]) + log[:] = () + + segment = TestSegment() + block_event.set() + updates[0] = ValueError('abc') + self.assertEqual(segment(pl=pl), None) + self.assertEqual(thread_number(), 1) + self.assertEqual(len(pl.exceptions), 1) + self.assertEqual(log, [ + ('set_state', {}), + ('update', None), + ]) + log[:] = () + pl.exceptions[:] = () + + segment = TestSegment() + block_event.set() + updates[0] = (TypeError('def'),) + self.assertRaises(TypeError, segment, pl=pl) + self.assertEqual(thread_number(), 1) + self.assertEqual(log, [ + ('set_state', {}), + ('update', None), + ('render', updates[0][0], {'pl': pl, 'update_first': True}), + ]) + log[:] = () + + # Threaded tests + segment = TestSegment() + block_event.clear() + kwargs = {'pl': pl, 'update_first': False, 'other': 1} + with lock: + updates[0] = ('abc',) + segment.startup(**kwargs) + ret = segment(**kwargs) + self.assertEqual(thread_number(), 2) + block_event.set() + event.wait() + segment.shutdown_event.set() + segment.thread.join() + self.assertEqual(ret, None) + self.assertEqual(log, [ + ('set_state', {'update_first': False, 'other': 1}), + ('render', None, {'pl': pl, 'update_first': False, 'other': 1}), + ('update', None), + ]) + log[:] = () + + segment = TestSegment() + block_event.set() + kwargs = {'pl': pl, 'update_first': True, 'other': 1} + with lock: + updates[0] = ('def',) + segment.startup(**kwargs) + ret = segment(**kwargs) + self.assertEqual(thread_number(), 2) + segment.shutdown_event.set() + segment.thread.join() + self.assertEqual(ret, 'def') + self.assertEqual(log, [ + ('set_state', {'update_first': True, 'other': 1}), + ('update', None), + ('render', 'def', {'pl': pl, 'update_first': True, 'other': 1}), + ]) + log[:] = () + + segment = TestSegment() + block_event.set() + kwargs = {'pl': pl, 'update_first': True, 'interval': 0.2} + with lock: + updates[0] = ('abc',) + segment.startup(**kwargs) + start = monotonic() + ret1 = segment(**kwargs) + with lock: + updates[0] = ('def',) + self.assertEqual(thread_number(), 2) + sleep(0.5) + ret2 = segment(**kwargs) + segment.shutdown_event.set() + segment.thread.join() + end = monotonic() + duration = end - start + self.assertEqual(ret1, 'abc') + self.assertEqual(ret2, 'def') + self.assertEqual(log[:5], [ + ('set_state', {'update_first': True, 'interval': 0.2}), + ('update', None), + ('render', 'abc', {'pl': pl, 'update_first': True, 'interval': 0.2}), + ('update', 'abc'), + ('update', 'def'), + ]) + num_runs = len([e for e in log if e[0] == 'update']) + self.assertAlmostEqual(duration / 0.2, num_runs, delta=1) + log[:] = () + + segment = TestSegment() + block_event.set() + kwargs = {'pl': pl, 'update_first': True, 'interval': 0.2} + with lock: + updates[0] = ('ghi',) + segment.startup(**kwargs) + start = monotonic() + ret1 = segment(**kwargs) + with lock: + updates[0] = TypeError('jkl') + self.assertEqual(thread_number(), 2) + sleep(0.5) + ret2 = segment(**kwargs) + segment.shutdown_event.set() + segment.thread.join() + end = monotonic() + duration = end - start + self.assertEqual(ret1, 'ghi') + self.assertEqual(ret2, None) + self.assertEqual(log[:5], [ + ('set_state', {'update_first': True, 'interval': 0.2}), + ('update', None), + ('render', 'ghi', {'pl': pl, 'update_first': True, 'interval': 0.2}), + ('update', 'ghi'), + ('update', 'ghi'), + ]) + num_runs = len([e for e in log if e[0] == 'update']) + self.assertAlmostEqual(duration / 0.2, num_runs, delta=1) + self.assertEqual(num_runs - 1, len(pl.exceptions)) + log[:] = () + + def test_kw_threaded_segment(self): + log = [] + pl = Pl() + event = threading.Event() + + class TestSegment(KwThreadedSegment): + interval = 10 + + @staticmethod + def key(_key=(None,), **kwargs): + log.append(('key', _key, kwargs)) + return _key + + def compute_state(self, key): + event.set() + sleep(0.1) + log.append(('compute_state', key)) + ret = key + if isinstance(ret, Exception): + raise ret + else: + return ret[0] + + def render_one(self, state, **kwargs): + log.append(('render_one', state, kwargs)) + if isinstance(state, Exception): + raise state + else: + return state + + # Non-threaded tests + segment = TestSegment() + event.clear() + self.assertEqual(segment(pl=pl), None) + self.assertEqual(thread_number(), 1) + self.assertEqual(log, [ + ('key', (None,), {'pl': pl}), + ('compute_state', (None,)), + ('render_one', None, {'pl': pl}), + ]) + log[:] = () + + segment = TestSegment() + kwargs = {'pl': pl, '_key': ('abc',), 'update_first': False} + event.clear() + self.assertEqual(segment(**kwargs), 'abc') + kwargs.update(_key=('def',)) + self.assertEqual(segment(**kwargs), 'def') + self.assertEqual(thread_number(), 1) + self.assertEqual(log, [ + ('key', ('abc',), {'pl': pl}), + ('compute_state', ('abc',)), + ('render_one', 'abc', {'pl': pl, '_key': ('abc',)}), + ('key', ('def',), {'pl': pl}), + ('compute_state', ('def',)), + ('render_one', 'def', {'pl': pl, '_key': ('def',)}), + ]) + log[:] = () + + segment = TestSegment() + kwargs = {'pl': pl, '_key': ValueError('xyz'), 'update_first': False} + event.clear() + self.assertEqual(segment(**kwargs), None) + self.assertEqual(thread_number(), 1) + self.assertEqual(log, [ + ('key', kwargs['_key'], {'pl': pl}), + ('compute_state', kwargs['_key']), + ]) + log[:] = () + + segment = TestSegment() + kwargs = {'pl': pl, '_key': (ValueError('abc'),), 'update_first': False} + event.clear() + self.assertRaises(ValueError, segment, **kwargs) + self.assertEqual(thread_number(), 1) + self.assertEqual(log, [ + ('key', kwargs['_key'], {'pl': pl}), + ('compute_state', kwargs['_key']), + ('render_one', kwargs['_key'][0], {'pl': pl, '_key': kwargs['_key']}), + ]) + log[:] = () + + # Threaded tests + segment = TestSegment() + kwargs = {'pl': pl, 'update_first': False, '_key': ('_abc',)} + event.clear() + segment.startup(**kwargs) + ret = segment(**kwargs) + self.assertEqual(thread_number(), 2) + segment.shutdown_event.set() + segment.thread.join() + self.assertEqual(ret, None) + self.assertEqual(log[:2], [ + ('key', kwargs['_key'], {'pl': pl}), + ('render_one', None, {'pl': pl, '_key': kwargs['_key']}), + ]) + self.assertLessEqual(len(log), 3) + if len(log) > 2: + self.assertEqual(log[2], ('compute_state', kwargs['_key'])) + log[:] = () + + segment = TestSegment() + kwargs = {'pl': pl, 'update_first': True, '_key': ('_abc',)} + event.clear() + segment.startup(**kwargs) + ret1 = segment(**kwargs) + kwargs.update(_key=('_def',)) + ret2 = segment(**kwargs) + self.assertEqual(thread_number(), 2) + segment.shutdown_event.set() + segment.thread.join() + self.assertEqual(ret1, '_abc') + self.assertEqual(ret2, '_def') + self.assertEqual(log, [ + ('key', ('_abc',), {'pl': pl}), + ('compute_state', ('_abc',)), + ('render_one', '_abc', {'pl': pl, '_key': ('_abc',)}), + ('key', ('_def',), {'pl': pl}), + ('compute_state', ('_def',)), + ('render_one', '_def', {'pl': pl, '_key': ('_def',)}), + ]) + log[:] = () + + +class TestLib(TestCase): + def test_mergedicts(self): + d = {} + mergedicts(d, {'abc': {'def': 'ghi'}}) + self.assertEqual(d, {'abc': {'def': 'ghi'}}) + mergedicts(d, {'abc': {'def': {'ghi': 'jkl'}}}) + self.assertEqual(d, {'abc': {'def': {'ghi': 'jkl'}}}) + mergedicts(d, {}) + self.assertEqual(d, {'abc': {'def': {'ghi': 'jkl'}}}) + mergedicts(d, {'abc': {'mno': 'pqr'}}) + self.assertEqual(d, {'abc': {'def': {'ghi': 'jkl'}, 'mno': 'pqr'}}) + mergedicts(d, {'abc': {'def': REMOVE_THIS_KEY}}) + self.assertEqual(d, {'abc': {'mno': 'pqr'}}) + + def test_add_divider_highlight_group(self): + def decorated_function_name(**kwargs): + return str(kwargs) + func = add_divider_highlight_group('hl_group')(decorated_function_name) + self.assertEqual(func.__name__, 'decorated_function_name') + self.assertEqual(func(kw={}), [{'contents': repr({str('kw'): {}}), 'divider_highlight_group': 'hl_group'}]) + + def test_humanize_bytes(self): + self.assertEqual(humanize_bytes(0), '0 B') + self.assertEqual(humanize_bytes(1), '1 B') + self.assertEqual(humanize_bytes(1, suffix='bit'), '1 bit') + self.assertEqual(humanize_bytes(1000, si_prefix=True), '1 kB') + self.assertEqual(humanize_bytes(1024, si_prefix=True), '1 kB') + self.assertEqual(humanize_bytes(1000000000, si_prefix=True), '1.00 GB') + self.assertEqual(humanize_bytes(1000000000, si_prefix=False), '953.7 MiB') + + +width_data = { + 'N': 1, # Neutral + 'Na': 1, # Narrow + 'A': 1, # Ambigious + 'H': 1, # Half-width + 'W': 2, # Wide + 'F': 2, # Fullwidth +} + + +class TestUnicode(TestCase): + def assertStringsIdentical(self, s1, s2): + self.assertTrue(type(s1) is type(s2), msg='string types differ') + self.assertEqual(s1, s2) + + def test_unicode(self): + self.assertTrue(type('abc') is plu.unicode) + + def test_unichr(self): + self.assertStringsIdentical('\U0010FFFF', plu.unichr(0x10FFFF)) + self.assertStringsIdentical('\uFFFF', plu.unichr(0xFFFF)) + self.assertStringsIdentical('\x20', plu.unichr(0x20)) + + def test_u(self): + self.assertStringsIdentical('Test', plu.u('Test')) + self.assertStringsIdentical('Test', plu.u(b'Test')) + self.assertStringsIdentical('«»', plu.u(b'\xC2\xAB\xC2\xBB')) + self.assertRaises(UnicodeDecodeError, plu.u, b'\xFF') + + def test_tointiter(self): + self.assertEqual([1, 2, 3], list(plu.tointiter(b'\x01\x02\x03'))) + + def test_decode_error(self): + self.assertStringsIdentical('<FF>', b'\xFF'.decode('utf-8', 'powerline_decode_error')) + self.assertStringsIdentical('abc', b'abc'.decode('utf-8', 'powerline_decode_error')) + + def test_register_strwidth_error(self): + ename = plu.register_strwidth_error(lambda s: 3) + self.assertStringsIdentical(b'???', 'A'.encode('latin1', ename)) + self.assertStringsIdentical(b'abc', 'abc'.encode('latin1', ename)) + + def test_out_u(self): + self.assertStringsIdentical('abc', plu.out_u('abc')) + self.assertStringsIdentical('abc', plu.out_u(b'abc')) + self.assertRaises(TypeError, plu.out_u, None) + + def test_safe_unicode(self): + self.assertStringsIdentical('abc', plu.safe_unicode('abc')) + self.assertStringsIdentical('abc', plu.safe_unicode(b'abc')) + self.assertStringsIdentical('«»', plu.safe_unicode(b'\xc2\xab\xc2\xbb')) + with replace_attr(plu, 'get_preferred_output_encoding', lambda: 'latin1'): + self.assertStringsIdentical('ÿ', plu.safe_unicode(b'\xFF')) + self.assertStringsIdentical('None', plu.safe_unicode(None)) + + class FailingStr(object): + def __str__(self): + raise NotImplementedError('Fail!') + + self.assertStringsIdentical('Fail!', plu.safe_unicode(FailingStr())) + + def test_FailedUnicode(self): + self.assertTrue(isinstance(plu.FailedUnicode('abc'), plu.unicode)) + self.assertEqual('abc', plu.FailedUnicode('abc')) + + def test_string(self): + self.assertStringsIdentical(str('abc'), plu.string('abc')) + self.assertStringsIdentical(str('abc'), plu.string(b'abc')) + + def test_surrogate_pair_to_character(self): + self.assertEqual(0x1F48E, plu.surrogate_pair_to_character(0xD83D, 0xDC8E)) + + def test_strwidth_ucs_4(self): + self.assertEqual(4, plu.strwidth_ucs_4(width_data, 'abcd')) + self.assertEqual(4, plu.strwidth_ucs_4(width_data, 'AB')) + if sys.maxunicode < 0x10FFFF: + raise SkipTest('Can only test strwidth_ucs_4 in UCS-4 Pythons') + + self.assertEqual(1, plu.strwidth_ucs_4(width_data, '\U0001F063')) + + def test_strwidth_ucs_2(self): + self.assertEqual(4, plu.strwidth_ucs_2(width_data, 'abcd')) + self.assertEqual(4, plu.strwidth_ucs_2(width_data, 'AB')) + if not sys.maxunicode < 0x10FFFF: + raise SkipTest('Can only test strwidth_ucs_2 in UCS-2 Pythons') + self.assertEqual(1, plu.strwidth_ucs_2(width_data, '\ud83c\udc30')) + + +class TestVCS(TestCase): + def do_branch_rename_test(self, repo, q): + st = monotonic() + while monotonic() - st < 1: + # Give inotify time to deliver events + ans = repo.branch() + if hasattr(q, '__call__'): + if q(ans): + break + else: + if ans == q: + break + sleep(0.01) + if hasattr(q, '__call__'): + self.assertTrue(q(ans)) + else: + self.assertEqual(ans, q) + + def test_git(self): + create_watcher = get_fallback_create_watcher() + repo = guess(path=GIT_REPO, create_watcher=create_watcher) + self.assertNotEqual(repo, None) + self.assertEqual(repo.branch(), 'master') + self.assertEqual(repo.status(), None) + self.assertEqual(repo.status('file'), None) + with open(os.path.join(GIT_REPO, 'file'), 'w') as f: + f.write('abc') + f.flush() + self.assertEqual(repo.status(), ' U') + self.assertEqual(repo.status('file'), '??') + call(['git', 'add', '.'], cwd=GIT_REPO) + self.assertEqual(repo.status(), ' I ') + self.assertEqual(repo.status('file'), 'A ') + f.write('def') + f.flush() + self.assertEqual(repo.status(), 'DI ') + self.assertEqual(repo.status('file'), 'AM') + os.remove(os.path.join(GIT_REPO, 'file')) + # Test changing branch + self.assertEqual(repo.branch(), 'master') + try: + call(['git', 'branch', 'branch1'], cwd=GIT_REPO) + call(['git', 'checkout', '-q', 'branch1'], cwd=GIT_REPO) + self.do_branch_rename_test(repo, 'branch1') + call(['git', 'branch', 'branch2'], cwd=GIT_REPO) + call(['git', 'checkout', '-q', 'branch2'], cwd=GIT_REPO) + self.do_branch_rename_test(repo, 'branch2') + call(['git', 'checkout', '-q', '--detach', 'branch1'], cwd=GIT_REPO) + self.do_branch_rename_test(repo, lambda b: re.match(r'^[a-f0-9]+$', b)) + finally: + call(['git', 'checkout', '-q', 'master'], cwd=GIT_REPO) + # Test stashing + self.assertEqual(repo.stash(), 0) + + def stash_save(): + with open(os.path.join(GIT_REPO, 'file'), 'w') as f: + f.write('abc') + return call(['git', 'stash', '-u'], cwd=GIT_REPO, stdout=PIPE) + + def stash_drop(): + return call(['git', 'stash', 'drop'], cwd=GIT_REPO, stdout=PIPE) + + def stash_list(): + return call(['git', 'stash', 'list'], cwd=GIT_REPO, stdout=PIPE) + + try: + stash_save() + self.assertEqual(repo.stash(), 1) + stash_save() + self.assertEqual(repo.stash(), 2) + stash_drop() + self.assertEqual(repo.stash(), 1) + stash_drop() + self.assertEqual(repo.stash(), 0) + finally: + while stash_list(): + stash_drop() + + def test_git_sym(self): + create_watcher = get_fallback_create_watcher() + dotgit = os.path.join(GIT_REPO, '.git') + spacegit = os.path.join(GIT_REPO, ' .git ') + os.rename(dotgit, spacegit) + try: + with open(dotgit, 'w') as F: + F.write('gitdir: .git \n') + gitdir = git_directory(GIT_REPO) + self.assertTrue(os.path.isdir(gitdir)) + self.assertEqual(gitdir, os.path.abspath(spacegit)) + repo = guess(path=GIT_REPO, create_watcher=create_watcher) + self.assertEqual(repo.branch(), 'master') + finally: + os.remove(dotgit) + os.rename(spacegit, dotgit) + + def test_mercurial(self): + if not use_mercurial: + raise SkipTest('Mercurial is not available') + create_watcher = get_fallback_create_watcher() + repo = guess(path=HG_REPO, create_watcher=create_watcher) + self.assertNotEqual(repo, None) + self.assertEqual(repo.branch(), 'default') + self.assertEqual(repo.status(), None) + with open(os.path.join(HG_REPO, 'file'), 'w') as f: + f.write('abc') + f.flush() + self.assertEqual(repo.status(), ' U') + self.assertEqual(repo.status('file'), 'U') + call(['hg', 'add', '.'], cwd=HG_REPO, stdout=PIPE) + self.assertEqual(repo.status(), 'D ') + self.assertEqual(repo.status('file'), 'A') + os.remove(os.path.join(HG_REPO, 'file')) + + def test_bzr(self): + if not use_bzr: + raise SkipTest('Bazaar is not available') + create_watcher = get_fallback_create_watcher() + repo = guess(path=BZR_REPO, create_watcher=create_watcher) + self.assertNotEqual(repo, None, 'No bzr repo found. Do you have bzr installed?') + self.assertEqual(repo.branch(), 'test_powerline') + self.assertEqual(repo.status(), None) + with open(os.path.join(BZR_REPO, 'file'), 'w') as f: + f.write('abc') + self.assertEqual(repo.status(), ' U') + self.assertEqual(repo.status('file'), '? ') + call(['bzr', 'add', '-q', '.'], cwd=BZR_REPO, stdout=PIPE) + self.assertEqual(repo.status(), 'D ') + self.assertEqual(repo.status('file'), '+N') + call(['bzr', 'commit', '-q', '-m', 'initial commit'], cwd=BZR_REPO) + self.assertEqual(repo.status(), None) + with open(os.path.join(BZR_REPO, 'file'), 'w') as f: + f.write('def') + self.assertEqual(repo.status(), 'D ') + self.assertEqual(repo.status('file'), ' M') + self.assertEqual(repo.status('notexist'), None) + with open(os.path.join(BZR_REPO, 'ignored'), 'w') as f: + f.write('abc') + self.assertEqual(repo.status('ignored'), '? ') + # Test changing the .bzrignore file should update status + with open(os.path.join(BZR_REPO, '.bzrignore'), 'w') as f: + f.write('ignored') + self.assertEqual(repo.status('ignored'), None) + # Test changing the dirstate file should invalidate the cache for + # all files in the repo + with open(os.path.join(BZR_REPO, 'file2'), 'w') as f: + f.write('abc') + call(['bzr', 'add', 'file2'], cwd=BZR_REPO, stdout=PIPE) + call(['bzr', 'commit', '-q', '-m', 'file2 added'], cwd=BZR_REPO) + with open(os.path.join(BZR_REPO, 'file'), 'a') as f: + f.write('hello') + with open(os.path.join(BZR_REPO, 'file2'), 'a') as f: + f.write('hello') + self.assertEqual(repo.status('file'), ' M') + self.assertEqual(repo.status('file2'), ' M') + call(['bzr', 'commit', '-q', '-m', 'multi'], cwd=BZR_REPO) + self.assertEqual(repo.status('file'), None) + self.assertEqual(repo.status('file2'), None) + + # Test changing branch + call(['bzr', 'nick', 'branch1'], cwd=BZR_REPO, stdout=PIPE, stderr=PIPE) + self.do_branch_rename_test(repo, 'branch1') + + # Test branch name/status changes when swapping repos + for x in ('b1', 'b2'): + d = os.path.join(BZR_REPO, x) + os.mkdir(d) + call(['bzr', 'init', '-q'], cwd=d) + call(['bzr', 'nick', '-q', x], cwd=d) + repo = guess(path=d, create_watcher=create_watcher) + self.assertEqual(repo.branch(), x) + self.assertFalse(repo.status()) + if x == 'b1': + open(os.path.join(d, 'dirty'), 'w').close() + self.assertTrue(repo.status()) + os.rename(os.path.join(BZR_REPO, 'b1'), os.path.join(BZR_REPO, 'b')) + os.rename(os.path.join(BZR_REPO, 'b2'), os.path.join(BZR_REPO, 'b1')) + os.rename(os.path.join(BZR_REPO, 'b'), os.path.join(BZR_REPO, 'b2')) + for x, y in (('b1', 'b2'), ('b2', 'b1')): + d = os.path.join(BZR_REPO, x) + repo = guess(path=d, create_watcher=create_watcher) + self.do_branch_rename_test(repo, y) + if x == 'b1': + self.assertFalse(repo.status()) + else: + self.assertTrue(repo.status()) + + @classmethod + def setUpClass(cls): + cls.powerline_old_cwd = os.getcwd() + os.chdir(os.path.dirname(os.path.dirname(__file__))) + call(['git', 'init', '--quiet', GIT_REPO]) + assert os.path.isdir(GIT_REPO) + call(['git', 'config', '--local', 'user.name', 'Foo'], cwd=GIT_REPO) + call(['git', 'config', '--local', 'user.email', 'bar@example.org'], cwd=GIT_REPO) + call(['git', 'commit', '--allow-empty', '--message', 'Initial commit', '--quiet'], cwd=GIT_REPO) + if use_mercurial: + cls.powerline_old_HGRCPATH = os.environ.get('HGRCPATH') + os.environ['HGRCPATH'] = '' + call(['hg', 'init', HG_REPO]) + with open(os.path.join(HG_REPO, '.hg', 'hgrc'), 'w') as hgrc: + hgrc.write('[ui]\n') + hgrc.write('username = Foo <bar@example.org>\n') + if use_bzr: + call(['bzr', 'init', '--quiet', BZR_REPO]) + call(['bzr', 'config', 'email=Foo <bar@example.org>'], cwd=BZR_REPO) + call(['bzr', 'config', 'nickname=test_powerline'], cwd=BZR_REPO) + call(['bzr', 'config', 'create_signatures=0'], cwd=BZR_REPO) + + @classmethod + def tearDownClass(cls): + for repo_dir in [GIT_REPO] + ([HG_REPO] if use_mercurial else []) + ([BZR_REPO] if use_bzr else []): + shutil.rmtree(repo_dir) + if use_mercurial: + if cls.powerline_old_HGRCPATH is None: + os.environ.pop('HGRCPATH') + else: + os.environ['HGRCPATH'] = cls.powerline_old_HGRCPATH + os.chdir(cls.powerline_old_cwd) + + +if __name__ == '__main__': + from tests.modules import main + main() diff --git a/tests/test_python/test_lib_config.py b/tests/test_python/test_lib_config.py new file mode 100644 index 0000000..053462a --- /dev/null +++ b/tests/test_python/test_lib_config.py @@ -0,0 +1,52 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import os + +from powerline.lib.config import ConfigLoader + +from tests.modules import TestCase +from tests.modules.lib.fsconfig import FSTree + + +FILE_ROOT = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'cfglib') + + +class LoadedList(list): + def pop_all(self): + try: + return self[:] + finally: + self[:] = () + + +loaded = LoadedList() + + +def on_load(key): + loaded.append(key) + + +def check_file(path): + if os.path.exists(path): + return path + else: + raise IOError + + +class TestLoaderCondition(TestCase): + def test_update_missing(self): + loader = ConfigLoader(run_once=True) + fpath = os.path.join(FILE_ROOT, 'file.json') + self.assertRaises(IOError, loader.load, fpath) + loader.register_missing(check_file, on_load, fpath) + loader.update() # This line must not raise IOError + with FSTree({'file': {'test': 1}}, root=FILE_ROOT): + loader.update() + self.assertEqual(loader.load(fpath), {'test': 1}) + self.assertEqual(loaded.pop_all(), [fpath]) + + +if __name__ == '__main__': + from tests.modules import main + main() diff --git a/tests/test_python/test_listers.py b/tests/test_python/test_listers.py new file mode 100644 index 0000000..a33f033 --- /dev/null +++ b/tests/test_python/test_listers.py @@ -0,0 +1,227 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import powerline.listers.i3wm as i3wm + +from tests.modules.lib import Args, replace_attr, Pl +from tests.modules import TestCase + + +class TestI3WM(TestCase): + @staticmethod + def get_workspaces(): + return iter([ + {'name': '1: w1', 'output': 'LVDS1', 'focused': False, 'urgent': False, 'visible': False}, + {'name': '2: w2', 'output': 'LVDS1', 'focused': False, 'urgent': False, 'visible': True}, + {'name': '3: w3', 'output': 'HDMI1', 'focused': False, 'urgent': True, 'visible': True}, + {'name': '4: w4', 'output': 'DVI01', 'focused': True, 'urgent': True, 'visible': True}, + ]) + + @staticmethod + def get_outputs(pl): + return iter([ + {'name': 'LVDS1'}, + {'name': 'HDMI1'}, + {'name': 'DVI01'}, + ]) + + def test_output_lister(self): + pl = Pl() + with replace_attr(i3wm, 'get_connected_xrandr_outputs', self.get_outputs): + self.assertEqual( + list(i3wm.output_lister(pl=pl, segment_info={'a': 1})), + [ + ({'a': 1, 'output': 'LVDS1'}, {'draw_inner_divider': None}), + ({'a': 1, 'output': 'HDMI1'}, {'draw_inner_divider': None}), + ({'a': 1, 'output': 'DVI01'}, {'draw_inner_divider': None}), + ] + ) + + def test_workspace_lister(self): + pl = Pl() + with replace_attr(i3wm, 'get_i3_connection', lambda: Args(get_workspaces=self.get_workspaces)): + self.assertEqual( + list(i3wm.workspace_lister(pl=pl, segment_info={'a': 1})), + [ + ({ + 'a': 1, + 'output': 'LVDS1', + 'workspace': { + 'name': '1: w1', + 'focused': False, + 'urgent': False, + 'visible': False + } + }, {'draw_inner_divider': None}), + ({ + 'a': 1, + 'output': 'LVDS1', + 'workspace': { + 'name': '2: w2', + 'focused': False, + 'urgent': False, + 'visible': True + } + }, {'draw_inner_divider': None}), + ({ + 'a': 1, + 'output': 'HDMI1', + 'workspace': { + 'name': '3: w3', + 'focused': False, + 'urgent': True, + 'visible': True + } + }, {'draw_inner_divider': None}), + ({ + 'a': 1, + 'output': 'DVI01', + 'workspace': { + 'name': '4: w4', + 'focused': True, + 'urgent': True, + 'visible': True + } + }, {'draw_inner_divider': None}), + ] + ) + + self.assertEqual( + list(i3wm.workspace_lister(pl=pl, segment_info={'a': 1}, output='LVDS1')), + [ + ({ + 'a': 1, + 'output': 'LVDS1', + 'workspace': { + 'name': '1: w1', + 'focused': False, + 'urgent': False, + 'visible': False + } + }, {'draw_inner_divider': None}), + ({ + 'a': 1, + 'output': 'LVDS1', + 'workspace': { + 'name': '2: w2', + 'focused': False, + 'urgent': False, + 'visible': True + } + }, {'draw_inner_divider': None}), + ] + ) + + self.assertEqual( + list(i3wm.workspace_lister( + pl=pl, + segment_info={'a': 1, 'output': 'LVDS1'} + )), + [ + ({ + 'a': 1, + 'output': 'LVDS1', + 'workspace': { + 'name': '1: w1', + 'focused': False, + 'urgent': False, + 'visible': False + } + }, {'draw_inner_divider': None}), + ({ + 'a': 1, + 'output': 'LVDS1', + 'workspace': { + 'name': '2: w2', + 'focused': False, + 'urgent': False, + 'visible': True + } + }, {'draw_inner_divider': None}), + ] + ) + + self.assertEqual( + list(i3wm.workspace_lister( + pl=pl, + segment_info={'a': 1, 'output': 'LVDS1'}, + output=False + )), + [ + ({ + 'a': 1, + 'output': 'LVDS1', + 'workspace': { + 'name': '1: w1', + 'focused': False, + 'urgent': False, + 'visible': False + } + }, {'draw_inner_divider': None}), + ({ + 'a': 1, + 'output': 'LVDS1', + 'workspace': { + 'name': '2: w2', + 'focused': False, + 'urgent': False, + 'visible': True + } + }, {'draw_inner_divider': None}), + ({ + 'a': 1, + 'output': 'HDMI1', + 'workspace': { + 'name': '3: w3', + 'focused': False, + 'urgent': True, + 'visible': True + } + }, {'draw_inner_divider': None}), + ({ + 'a': 1, + 'output': 'DVI01', + 'workspace': { + 'name': '4: w4', + 'focused': True, + 'urgent': True, + 'visible': True + } + }, {'draw_inner_divider': None}), + ] + ) + + self.assertEqual( + list(i3wm.workspace_lister( + pl=pl, + segment_info={'a': 1}, + only_show=['focused', 'urgent'] + )), + [ + ({ + 'a': 1, + 'output': 'HDMI1', + 'workspace': { + 'name': '3: w3', + 'focused': False, + 'urgent': True, + 'visible': True + } + }, {'draw_inner_divider': None}), + ({ + 'a': 1, + 'output': 'DVI01', + 'workspace': { + 'name': '4: w4', + 'focused': True, + 'urgent': True, + 'visible': True + } + }, {'draw_inner_divider': None}), + ] + ) + + +if __name__ == '__main__': + from tests.modules import main + main() diff --git a/tests/test_python/test_logging.py b/tests/test_python/test_logging.py new file mode 100644 index 0000000..d7cfe4a --- /dev/null +++ b/tests/test_python/test_logging.py @@ -0,0 +1,467 @@ +# vim:fileencoding=utf-8:noet + +'''Tests for various logging features''' + +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import sys +import re +import codecs +import os + +from io import StringIO +from shutil import rmtree + +from powerline import finish_common_config, create_logger + +from tests.modules import TestCase +from tests.modules.lib import replace_attr + + +TIMESTAMP_RE = r'\d{4}-\d\d-\d\d \d\d:\d\d:\d\d,\d{3}' + + +class TestRE(TestCase): + def assertMatches(self, text, regexp): + self.assertTrue( + re.match(regexp, text), + '{0!r} did not match {1!r}'.format(text, regexp), + ) + + +def close_handlers(logger): + for handler in logger.handlers: + handler.close() + + +class TestHandlers(TestRE): + def test_stderr_handler_is_default(self): + out = StringIO() + err = StringIO() + + with replace_attr(sys, 'stdout', out, 'stderr', err): + common_config = finish_common_config('utf-8', {}) + logger, pl, get_module_attr = create_logger(common_config) + pl.error('Foo') + close_handlers(logger) + self.assertMatches(err.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$') + self.assertEqual(out.getvalue(), '') + + def test_stream_override(self): + out = StringIO() + err = StringIO() + stream = StringIO() + + with replace_attr(sys, 'stdout', out, 'stderr', err): + common_config = finish_common_config('utf-8', {}) + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + pl.error('Foo') + close_handlers(logger) + self.assertMatches(stream.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$') + self.assertEqual(err.getvalue(), '') + self.assertEqual(out.getvalue(), '') + + def test_explicit_none(self): + out = StringIO() + err = StringIO() + stream = StringIO() + + with replace_attr(sys, 'stdout', out, 'stderr', err): + common_config = finish_common_config('utf-8', {'log_file': [None]}) + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + pl.error('Foo') + close_handlers(logger) + self.assertMatches(stream.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$') + self.assertEqual(err.getvalue(), '') + self.assertEqual(out.getvalue(), '') + + def test_explicit_stream_handler(self): + out = StringIO() + err = StringIO() + stream = StringIO() + + with replace_attr(sys, 'stdout', out, 'stderr', err): + common_config = finish_common_config('utf-8', {'log_file': [['logging.StreamHandler', [[]]]]}) + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + pl.error('Foo') + close_handlers(logger) + self.assertEqual(stream.getvalue(), '') + self.assertMatches(err.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$') + self.assertEqual(out.getvalue(), '') + + def test_explicit_stream_handler_implicit_stream(self): + out = StringIO() + err = StringIO() + stream = StringIO() + + with replace_attr(sys, 'stdout', out, 'stderr', err): + common_config = finish_common_config('utf-8', {'log_file': [['logging.StreamHandler', []]]}) + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + pl.error('Foo') + close_handlers(logger) + self.assertMatches(stream.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$') + self.assertEqual(err.getvalue(), '') + self.assertEqual(out.getvalue(), '') + + def test_file_handler(self): + out = StringIO() + err = StringIO() + stream = StringIO() + file_name = 'test_logging-test_file_handler' + + with replace_attr(sys, 'stdout', out, 'stderr', err): + common_config = finish_common_config('utf-8', {'log_file': file_name}) + try: + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + pl.error('Foo') + close_handlers(logger) + with codecs.open(file_name, encoding='utf-8') as fp: + self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$') + finally: + os.unlink(file_name) + self.assertEqual(stream.getvalue(), '') + self.assertEqual(err.getvalue(), '') + self.assertEqual(out.getvalue(), '') + + def test_file_handler_create_dir(self): + out = StringIO() + err = StringIO() + stream = StringIO() + file_name = 'test_logging-test_file_handler_create_dir/file' + + self.assertFalse(os.path.isdir(os.path.dirname(file_name))) + + with replace_attr(sys, 'stdout', out, 'stderr', err): + common_config = finish_common_config('utf-8', {'log_file': file_name}) + try: + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + pl.error('Foo') + close_handlers(logger) + self.assertTrue(os.path.isdir(os.path.dirname(file_name))) + with codecs.open(file_name, encoding='utf-8') as fp: + self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$') + finally: + rmtree(os.path.dirname(file_name)) + self.assertEqual(stream.getvalue(), '') + self.assertEqual(err.getvalue(), '') + self.assertEqual(out.getvalue(), '') + + def test_multiple_files(self): + out = StringIO() + err = StringIO() + stream = StringIO() + file_name_1 = 'test_logging-test_multiple_files-1' + file_name_2 = file_name_1[:-1] + '2' + + with replace_attr(sys, 'stdout', out, 'stderr', err): + common_config = finish_common_config('utf-8', {'log_file': [file_name_1, file_name_2]}) + try: + try: + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + pl.error('Foo') + close_handlers(logger) + for file_name in (file_name_1, file_name_2): + with codecs.open(file_name, encoding='utf-8') as fp: + self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$') + finally: + os.unlink(file_name_1) + finally: + os.unlink(file_name_2) + self.assertEqual(stream.getvalue(), '') + self.assertEqual(err.getvalue(), '') + self.assertEqual(out.getvalue(), '') + + def test_multiple_files_and_stream(self): + out = StringIO() + err = StringIO() + stream = StringIO() + file_name_1 = 'test_logging-test_multiple_files_and_stream-1' + file_name_2 = file_name_1[:-1] + '2' + + with replace_attr(sys, 'stdout', out, 'stderr', err): + common_config = finish_common_config('utf-8', {'log_file': [file_name_1, file_name_2, None]}) + try: + try: + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + pl.error('Foo') + close_handlers(logger) + for file_name in (file_name_1, file_name_2): + with codecs.open(file_name, encoding='utf-8') as fp: + self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$') + finally: + os.unlink(file_name_1) + finally: + os.unlink(file_name_2) + self.assertMatches(stream.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$') + self.assertEqual(err.getvalue(), '') + self.assertEqual(out.getvalue(), '') + + def test_handler_args(self): + out = StringIO() + err = StringIO() + stream = StringIO() + file_name = 'test_logging-test_handler_args' + + with replace_attr(sys, 'stdout', out, 'stderr', err): + common_config = finish_common_config('utf-8', {'log_file': [ + ['RotatingFileHandler', [[file_name]]] + ]}) + try: + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + pl.error('Foo') + close_handlers(logger) + with codecs.open(file_name, encoding='utf-8') as fp: + self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$') + finally: + os.unlink(file_name) + self.assertEqual(stream.getvalue(), '') + self.assertEqual(err.getvalue(), '') + self.assertEqual(out.getvalue(), '') + + def test_handler_args_kwargs(self): + out = StringIO() + err = StringIO() + stream = StringIO() + file_name = 'test_logging-test_handler_args_kwargs' + + with replace_attr(sys, 'stdout', out, 'stderr', err): + common_config = finish_common_config('utf-8', {'log_file': [ + ['RotatingFileHandler', [[file_name], {'maxBytes': 1, 'backupCount': 1}]] + ]}) + try: + try: + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + pl.error('Foo') + pl.error('Bar') + close_handlers(logger) + with codecs.open(file_name, encoding='utf-8') as fp: + self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Bar\n$') + with codecs.open(file_name + '.1', encoding='utf-8') as fp: + self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$') + finally: + os.unlink(file_name + '.1') + finally: + os.unlink(file_name) + self.assertEqual(stream.getvalue(), '') + self.assertEqual(err.getvalue(), '') + self.assertEqual(out.getvalue(), '') + + def test_logger_level(self): + out = StringIO() + err = StringIO() + stream = StringIO() + stream1 = StringIO() + stream2 = StringIO() + + with replace_attr(sys, 'stdout', out, 'stderr', err): + common_config = finish_common_config('utf-8', {'log_file': [ + ['logging.StreamHandler', [[stream1]], 'WARNING'], + ['logging.StreamHandler', [[stream2]], 'ERROR'], + ]}) + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + pl.warn('Foo') + pl.error('Bar') + close_handlers(logger) + self.assertMatches(stream1.getvalue(), ( + '^' + TIMESTAMP_RE + ':WARNING:__unknown__:Foo\n' + + TIMESTAMP_RE + ':ERROR:__unknown__:Bar\n$' + )) + self.assertMatches(stream2.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Bar\n$') + self.assertEqual(stream.getvalue(), '') + self.assertEqual(err.getvalue(), '') + self.assertEqual(out.getvalue(), '') + + def test_logger_level_not_overriding_default(self): + out = StringIO() + err = StringIO() + stream = StringIO() + stream1 = StringIO() + + with replace_attr(sys, 'stdout', out, 'stderr', err): + common_config = finish_common_config('utf-8', {'log_file': [ + ['logging.StreamHandler', [[stream1]], 'DEBUG'], + ]}) + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + pl.debug('Foo') + pl.error('Bar') + close_handlers(logger) + self.assertMatches(stream1.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Bar\n$') + self.assertEqual(stream.getvalue(), '') + self.assertEqual(err.getvalue(), '') + self.assertEqual(out.getvalue(), '') + + def test_top_log_level(self): + out = StringIO() + err = StringIO() + stream = StringIO() + stream1 = StringIO() + + with replace_attr(sys, 'stdout', out, 'stderr', err): + common_config = finish_common_config('utf-8', {'log_file': [ + ['logging.StreamHandler', [[stream1]], 'DEBUG'], + ], 'log_level': 'DEBUG'}) + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + pl.debug('Foo') + pl.error('Bar') + close_handlers(logger) + self.assertMatches(stream1.getvalue(), ( + '^' + TIMESTAMP_RE + ':DEBUG:__unknown__:Foo\n' + + TIMESTAMP_RE + ':ERROR:__unknown__:Bar\n$' + )) + self.assertEqual(stream.getvalue(), '') + self.assertEqual(err.getvalue(), '') + self.assertEqual(out.getvalue(), '') + + def test_logger_format(self): + out = StringIO() + err = StringIO() + stream = StringIO() + stream1 = StringIO() + + with replace_attr(sys, 'stdout', out, 'stderr', err): + common_config = finish_common_config('utf-8', {'log_file': [ + ['logging.StreamHandler', [[stream1]], 'WARNING', 'FOO'], + ]}) + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + pl.warn('Foo') + pl.error('Bar') + close_handlers(logger) + self.assertEqual(stream1.getvalue(), 'FOO\nFOO\n') + self.assertEqual(stream.getvalue(), '') + self.assertEqual(err.getvalue(), '') + self.assertEqual(out.getvalue(), '') + + def test_top_log_format(self): + out = StringIO() + err = StringIO() + stream = StringIO() + stream1 = StringIO() + stream2 = StringIO() + + with replace_attr(sys, 'stdout', out, 'stderr', err): + common_config = finish_common_config('utf-8', {'log_file': [ + ['logging.StreamHandler', [[stream1]], 'WARNING', 'FOO'], + ['logging.StreamHandler', [[stream2]], 'WARNING'], + ], 'log_format': 'BAR'}) + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + pl.warn('Foo') + pl.error('Bar') + close_handlers(logger) + self.assertEqual(stream2.getvalue(), 'BAR\nBAR\n') + self.assertEqual(stream1.getvalue(), 'FOO\nFOO\n') + self.assertEqual(stream.getvalue(), '') + self.assertEqual(err.getvalue(), '') + self.assertEqual(out.getvalue(), '') + + +class TestPowerlineLogger(TestRE): + def test_args_formatting(self): + stream = StringIO() + + common_config = finish_common_config('utf-8', {}) + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + pl.warn('foo {0}', 'Test') + pl.warn('bar {0!r}', 'Test') + close_handlers(logger) + self.assertMatches(stream.getvalue(), ( + '^' + TIMESTAMP_RE + ':WARNING:__unknown__:foo Test\n' + + TIMESTAMP_RE + ':WARNING:__unknown__:bar u?\'Test\'\n$' + )) + + def test_prefix_formatting(self): + stream = StringIO() + + common_config = finish_common_config('utf-8', {}) + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + pl.prefix = '1' + pl.warn('foo') + pl.prefix = '2' + pl.warn('bar') + close_handlers(logger) + self.assertMatches(stream.getvalue(), ( + '^' + TIMESTAMP_RE + ':WARNING:__unknown__:1:foo\n' + + TIMESTAMP_RE + ':WARNING:__unknown__:2:bar\n$' + )) + + def test_kwargs_formatting(self): + stream = StringIO() + + common_config = finish_common_config('utf-8', {}) + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + pl.warn('foo {arg}', arg='Test') + pl.warn('bar {arg!r}', arg='Test') + close_handlers(logger) + self.assertMatches(stream.getvalue(), ( + '^' + TIMESTAMP_RE + ':WARNING:__unknown__:foo Test\n' + + TIMESTAMP_RE + ':WARNING:__unknown__:bar u?\'Test\'\n$' + )) + + def test_args_kwargs_formatting(self): + stream = StringIO() + + common_config = finish_common_config('utf-8', {}) + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + pl.warn('foo {0!r} {arg}', 'Test0', arg='Test') + pl.warn('bar {0} {arg!r}', 'Test0', arg='Test') + close_handlers(logger) + self.assertMatches(stream.getvalue(), ( + '^' + TIMESTAMP_RE + ':WARNING:__unknown__:foo u?\'Test0\' Test\n' + + TIMESTAMP_RE + ':WARNING:__unknown__:bar Test0 u?\'Test\'\n$' + )) + + def test_exception_formatting(self): + stream = StringIO() + + common_config = finish_common_config('utf-8', {}) + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + try: + raise ValueError('foo') + except ValueError: + pl.exception('Message') + close_handlers(logger) + self.assertMatches(stream.getvalue(), ( + '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Message\n' + + 'Traceback \\(most recent call last\\):\n' + + '(?: File ".*?", line \\d+, in \\w+\n [^\n]*\n)+' + + 'ValueError: foo\n$' + )) + + def test_levels(self): + stream = StringIO() + + common_config = finish_common_config('utf-8', {'log_level': 'DEBUG'}) + logger, pl, get_module_attr = create_logger(common_config, stream=stream) + pl.debug('1') + pl.info('2') + pl.warn('3') + pl.error('4') + pl.critical('5') + close_handlers(logger) + self.assertMatches(stream.getvalue(), ( + '^' + TIMESTAMP_RE + ':DEBUG:__unknown__:1\n' + + TIMESTAMP_RE + ':INFO:__unknown__:2\n' + + TIMESTAMP_RE + ':WARNING:__unknown__:3\n' + + TIMESTAMP_RE + ':ERROR:__unknown__:4\n' + + TIMESTAMP_RE + ':CRITICAL:__unknown__:5\n$' + )) + + +old_cwd = None + + +def setUpModule(): + global old_cwd + global __file__ + old_cwd = os.getcwd() + __file__ = os.path.abspath(__file__) + os.chdir(os.path.dirname(os.path.dirname(__file__))) + + +def tearDownModule(): + global old_cwd + os.chdir(old_cwd) + + +if __name__ == '__main__': + from tests.modules import main + main() diff --git a/tests/test_python/test_provided_config_files.py b/tests/test_python/test_provided_config_files.py new file mode 100644 index 0000000..fd8b16e --- /dev/null +++ b/tests/test_python/test_provided_config_files.py @@ -0,0 +1,201 @@ +# vim:fileencoding=utf-8:noet + +'''Dynamic configuration files tests.''' + +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import sys +import os +import json +import logging + +import tests.modules.vim as vim_module + +from tests.modules.lib import Args, urllib_read, replace_attr +from tests.modules import TestCase + +from powerline import NotInterceptedError +from powerline.segments.common import wthr + + +VBLOCK = chr(ord('V') - 0x40) +SBLOCK = chr(ord('S') - 0x40) + + +class FailingLogger(logging.Logger): + def exception(self, *args, **kwargs): + super(FailingLogger, self).exception(*args, **kwargs) + raise NotInterceptedError('Unexpected exception occurred') + + +def get_logger(stream=None): + log_format = '%(asctime)s:%(levelname)s:%(message)s' + formatter = logging.Formatter(log_format) + + level = logging.WARNING + handler = logging.StreamHandler(stream) + handler.setLevel(level) + handler.setFormatter(formatter) + + logger = FailingLogger('powerline') + logger.setLevel(level) + logger.addHandler(handler) + return logger + + +class TestVimConfig(TestCase): + def test_vim(self): + from powerline.vim import VimPowerline + cfg_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'powerline', 'config_files') + buffers = ( + (('bufoptions',), {'buftype': 'help'}), + (('bufname', '[Command Line]'), {}), + (('bufoptions',), {'buftype': 'quickfix'}), + (('bufname', 'NERD_tree_1'), {}), + (('bufname', '__Gundo__'), {}), + (('bufname', '__Gundo_Preview__'), {}), + # No Command-T tests here: requires +ruby or emulation + # No tabline here: tablines are tested separately + ) + with open(os.path.join(cfg_path, 'config.json'), 'r') as f: + local_themes_raw = json.load(f)['ext']['vim']['local_themes'] + # Don’t run tests on external/plugin segments + local_themes = dict((k, v) for (k, v) in local_themes_raw.items()) + # See end of the buffers definition above for `- 2` + self.assertEqual(len(buffers), len(local_themes) - 2) + outputs = {} + i = 0 + + with vim_module._with('split'): + with VimPowerline(logger=get_logger()) as powerline: + def check_output(mode, args, kwargs): + if mode == 'nc': + window = vim_module.windows[0] + window_id = 2 + else: + vim_module._start_mode(mode) + window = vim_module.current.window + window_id = 1 + winnr = window.number + out = powerline.render(window, window_id, winnr) + if out in outputs: + self.fail('Duplicate in set #{0} ({1}) for mode {2!r} (previously defined in set #{3} ({4!r}) for mode {5!r})'.format(i, (args, kwargs), mode, *outputs[out])) + outputs[out] = (i, (args, kwargs), mode) + + with vim_module._with('bufname', '/tmp/foo.txt'): + out = powerline.render(vim_module.current.window, 1, vim_module.current.window.number, is_tabline=True) + outputs[out] = (-1, (None, None), 'tab') + with vim_module._with('globals', powerline_config_paths=[cfg_path]): + exclude = set(('no', 'v', 'V', VBLOCK, 's', 'S', SBLOCK, 'R', 'Rv', 'c', 'cv', 'ce', 'r', 'rm', 'r?', '!')) + try: + for mode in ['n', 'nc', 'no', 'v', 'V', VBLOCK, 's', 'S', SBLOCK, 'i', 'R', 'Rv', 'c', 'cv', 'ce', 'r', 'rm', 'r?', '!']: + check_output(mode, None, None) + for args, kwargs in buffers: + i += 1 + if mode in exclude: + continue + with vim_module._with(*args, **kwargs): + check_output(mode, args, kwargs) + finally: + vim_module._start_mode('n') + + @classmethod + def setUpClass(cls): + sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(os.path.dirname(__file__)), 'vim_sys_path'))) + + @classmethod + def tearDownClass(cls): + sys.path.pop(0) + + +class TestConfig(TestCase): + def test_tmux(self): + from powerline.segments import common + from imp import reload + reload(common) + from powerline.shell import ShellPowerline + with replace_attr(common, 'urllib_read', urllib_read): + with ShellPowerline(Args(ext=['tmux']), logger=get_logger(), run_once=False) as powerline: + powerline.render() + with ShellPowerline(Args(ext=['tmux']), logger=get_logger(), run_once=False) as powerline: + powerline.render() + + def test_zsh(self): + from powerline.shell import ShellPowerline + args = Args(last_pipe_status=[1, 0], jobnum=0, ext=['shell'], renderer_module='.zsh') + segment_info = {'args': args} + with ShellPowerline(args, logger=get_logger(), run_once=False) as powerline: + powerline.render(segment_info=segment_info) + with ShellPowerline(args, logger=get_logger(), run_once=False) as powerline: + powerline.render(segment_info=segment_info) + segment_info['local_theme'] = 'select' + with ShellPowerline(args, logger=get_logger(), run_once=False) as powerline: + powerline.render(segment_info=segment_info) + segment_info['local_theme'] = 'continuation' + segment_info['parser_state'] = 'if cmdsubst' + with ShellPowerline(args, logger=get_logger(), run_once=False) as powerline: + powerline.render(segment_info=segment_info) + + def test_bash(self): + from powerline.shell import ShellPowerline + args = Args(last_exit_code=1, last_pipe_status=[], jobnum=0, ext=['shell'], renderer_module='.bash', config_override={'ext': {'shell': {'theme': 'default_leftonly'}}}) + with ShellPowerline(args, logger=get_logger(), run_once=False) as powerline: + powerline.render(segment_info={'args': args}) + with ShellPowerline(args, logger=get_logger(), run_once=False) as powerline: + powerline.render(segment_info={'args': args}) + + def test_ipython(self): + from powerline.ipython import IPythonPowerline + + class IpyPowerline(IPythonPowerline): + config_paths = None + config_overrides = None + theme_overrides = {} + + segment_info = Args(prompt_count=1) + + with IpyPowerline(logger=get_logger(), renderer_module='.pre_5') as powerline: + for prompt_type in ['in', 'in2']: + powerline.render(is_prompt=True, matcher_info=prompt_type, segment_info=segment_info) + powerline.render(is_prompt=True, matcher_info=prompt_type, segment_info=segment_info) + with IpyPowerline(logger=get_logger(), renderer_module='.pre_5') as powerline: + for prompt_type in ['out', 'rewrite']: + powerline.render(is_prompt=False, matcher_info=prompt_type, segment_info=segment_info) + powerline.render(is_prompt=False, matcher_info=prompt_type, segment_info=segment_info) + + def test_wm(self): + from powerline.segments import common + from imp import reload + reload(common) + from powerline import Powerline + with replace_attr(wthr, 'urllib_read', urllib_read): + Powerline(logger=get_logger(), ext='wm', renderer_module='pango_markup', run_once=True).render() + reload(common) + + +old_cwd = None +saved_get_config_paths = None + + +def setUpModule(): + global old_cwd + global saved_get_config_paths + import powerline + saved_get_config_paths = powerline.get_config_paths + path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'powerline', 'config_files') + powerline.get_config_paths = lambda: [path] + old_cwd = os.getcwd() + + +def tearDownModule(): + global old_cwd + global saved_get_config_paths + import powerline + powerline.get_config_paths = saved_get_config_paths + os.chdir(old_cwd) + old_cwd = None + + +if __name__ == '__main__': + from tests.modules import main + main() diff --git a/tests/test_python/test_segments.py b/tests/test_python/test_segments.py new file mode 100644 index 0000000..3f09470 --- /dev/null +++ b/tests/test_python/test_segments.py @@ -0,0 +1,1711 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import sys +import os + +from functools import partial +from collections import namedtuple +from time import sleep +from platform import python_implementation + +from powerline.segments import shell, tmux, pdb, i3wm +from powerline.lib.vcs import get_fallback_create_watcher +from powerline.lib.unicode import out_u + +import tests.modules.vim as vim_module + +from tests.modules.lib import (Args, urllib_read, replace_attr, new_module, + replace_module_module, replace_env, Pl) +from tests.modules import TestCase, SkipTest + + +def get_dummy_guess(**kwargs): + if 'directory' in kwargs: + def guess(path, create_watcher): + return Args(branch=lambda: out_u(os.path.basename(path)), **kwargs) + else: + def guess(path, create_watcher): + return Args(branch=lambda: out_u(os.path.basename(path)), directory=path, **kwargs) + return guess + + +class TestShell(TestCase): + def test_last_status(self): + pl = Pl() + segment_info = {'args': Args(last_exit_code=10)} + self.assertEqual(shell.last_status(pl=pl, segment_info=segment_info), [ + {'contents': '10', 'highlight_groups': ['exit_fail']} + ]) + segment_info['args'].last_exit_code = 0 + self.assertEqual(shell.last_status(pl=pl, segment_info=segment_info), None) + segment_info['args'].last_exit_code = None + self.assertEqual(shell.last_status(pl=pl, segment_info=segment_info), None) + segment_info['args'].last_exit_code = 'sigsegv' + self.assertEqual(shell.last_status(pl=pl, segment_info=segment_info), [ + {'contents': 'sigsegv', 'highlight_groups': ['exit_fail']} + ]) + segment_info['args'].last_exit_code = 'sigsegv+core' + self.assertEqual(shell.last_status(pl=pl, segment_info=segment_info), [ + {'contents': 'sigsegv+core', 'highlight_groups': ['exit_fail']} + ]) + + def test_last_pipe_status(self): + pl = Pl() + segment_info = {'args': Args(last_pipe_status=[], last_exit_code=0)} + self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=segment_info), None) + segment_info['args'].last_pipe_status = [0, 0, 0] + self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=segment_info), None) + segment_info['args'].last_pipe_status = [0, 0] + self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=segment_info), None) + segment_info['args'].last_pipe_status = [0] + self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=segment_info), None) + segment_info['args'].last_pipe_status = [0, 2, 0] + self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=segment_info), [ + {'contents': '0', 'highlight_groups': ['exit_success'], 'draw_inner_divider': True}, + {'contents': '2', 'highlight_groups': ['exit_fail'], 'draw_inner_divider': True}, + {'contents': '0', 'highlight_groups': ['exit_success'], 'draw_inner_divider': True}, + ]) + segment_info['args'].last_pipe_status = [2, 0, 0] + self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=segment_info), [ + {'contents': '2', 'highlight_groups': ['exit_fail'], 'draw_inner_divider': True}, + {'contents': '0', 'highlight_groups': ['exit_success'], 'draw_inner_divider': True}, + {'contents': '0', 'highlight_groups': ['exit_success'], 'draw_inner_divider': True}, + ]) + segment_info['args'].last_pipe_status = [0, 0, 2] + self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=segment_info), [ + {'contents': '0', 'highlight_groups': ['exit_success'], 'draw_inner_divider': True}, + {'contents': '0', 'highlight_groups': ['exit_success'], 'draw_inner_divider': True}, + {'contents': '2', 'highlight_groups': ['exit_fail'], 'draw_inner_divider': True}, + ]) + segment_info['args'].last_pipe_status = [2] + self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=segment_info), [ + {'contents': '2', 'highlight_groups': ['exit_fail'], 'draw_inner_divider': True}, + ]) + segment_info['args'].last_pipe_status = [0, 'sigsegv', 'sigsegv+core'] + self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=segment_info), [ + {'contents': '0', 'highlight_groups': ['exit_success'], 'draw_inner_divider': True}, + {'contents': 'sigsegv', 'highlight_groups': ['exit_fail'], 'draw_inner_divider': True}, + {'contents': 'sigsegv+core', 'highlight_groups': ['exit_fail'], 'draw_inner_divider': True} + ]) + segment_info['args'].last_pipe_status = [0, 'sigsegv', 0] + self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=segment_info), [ + {'contents': '0', 'highlight_groups': ['exit_success'], 'draw_inner_divider': True}, + {'contents': 'sigsegv', 'highlight_groups': ['exit_fail'], 'draw_inner_divider': True}, + {'contents': '0', 'highlight_groups': ['exit_success'], 'draw_inner_divider': True} + ]) + segment_info['args'].last_pipe_status = [0, 'sigsegv+core', 0] + self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=segment_info), [ + {'contents': '0', 'highlight_groups': ['exit_success'], 'draw_inner_divider': True}, + {'contents': 'sigsegv+core', 'highlight_groups': ['exit_fail'], 'draw_inner_divider': True}, + {'contents': '0', 'highlight_groups': ['exit_success'], 'draw_inner_divider': True} + ]) + segment_info['args'].last_pipe_status = [] + segment_info['args'].last_exit_code = 5 + self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=segment_info), [ + {'contents': '5', 'highlight_groups': ['exit_fail'], 'draw_inner_divider': True}, + ]) + + def test_jobnum(self): + pl = Pl() + segment_info = {'args': Args(jobnum=0)} + self.assertEqual(shell.jobnum(pl=pl, segment_info=segment_info), None) + self.assertEqual(shell.jobnum(pl=pl, segment_info=segment_info, show_zero=False), None) + self.assertEqual(shell.jobnum(pl=pl, segment_info=segment_info, show_zero=True), '0') + segment_info = {'args': Args(jobnum=1)} + self.assertEqual(shell.jobnum(pl=pl, segment_info=segment_info), '1') + self.assertEqual(shell.jobnum(pl=pl, segment_info=segment_info, show_zero=False), '1') + self.assertEqual(shell.jobnum(pl=pl, segment_info=segment_info, show_zero=True), '1') + + def test_continuation(self): + pl = Pl() + self.assertEqual(shell.continuation(pl=pl, segment_info={}), [{ + 'contents': '', + 'width': 'auto', + 'highlight_groups': ['continuation:current', 'continuation'], + }]) + segment_info = {'parser_state': 'if cmdsubst'} + self.assertEqual(shell.continuation(pl=pl, segment_info=segment_info), [ + { + 'contents': 'if', + 'draw_inner_divider': True, + 'highlight_groups': ['continuation:current', 'continuation'], + 'width': 'auto', + 'align': 'l', + }, + ]) + self.assertEqual(shell.continuation(pl=pl, segment_info=segment_info, right_align=True), [ + { + 'contents': 'if', + 'draw_inner_divider': True, + 'highlight_groups': ['continuation:current', 'continuation'], + 'width': 'auto', + 'align': 'r', + }, + ]) + self.assertEqual(shell.continuation(pl=pl, segment_info=segment_info, omit_cmdsubst=False), [ + { + 'contents': 'if', + 'draw_inner_divider': True, + 'highlight_groups': ['continuation'], + }, + { + 'contents': 'cmdsubst', + 'draw_inner_divider': True, + 'highlight_groups': ['continuation:current', 'continuation'], + 'width': 'auto', + 'align': 'l', + }, + ]) + self.assertEqual(shell.continuation(pl=pl, segment_info=segment_info, omit_cmdsubst=False, right_align=True), [ + { + 'contents': 'if', + 'draw_inner_divider': True, + 'highlight_groups': ['continuation'], + 'width': 'auto', + 'align': 'r', + }, + { + 'contents': 'cmdsubst', + 'draw_inner_divider': True, + 'highlight_groups': ['continuation:current', 'continuation'], + }, + ]) + self.assertEqual(shell.continuation(pl=pl, segment_info=segment_info, omit_cmdsubst=True, right_align=True), [ + { + 'contents': 'if', + 'draw_inner_divider': True, + 'highlight_groups': ['continuation:current', 'continuation'], + 'width': 'auto', + 'align': 'r', + }, + ]) + self.assertEqual(shell.continuation(pl=pl, segment_info=segment_info, omit_cmdsubst=True, right_align=True, renames={'if': 'IF'}), [ + { + 'contents': 'IF', + 'draw_inner_divider': True, + 'highlight_groups': ['continuation:current', 'continuation'], + 'width': 'auto', + 'align': 'r', + }, + ]) + self.assertEqual(shell.continuation(pl=pl, segment_info=segment_info, omit_cmdsubst=True, right_align=True, renames={'if': None}), [ + { + 'contents': '', + 'highlight_groups': ['continuation:current', 'continuation'], + 'width': 'auto', + 'align': 'r', + }, + ]) + segment_info = {'parser_state': 'then then then cmdsubst'} + self.assertEqual(shell.continuation(pl=pl, segment_info=segment_info), [ + { + 'contents': 'then', + 'draw_inner_divider': True, + 'highlight_groups': ['continuation'], + }, + { + 'contents': 'then', + 'draw_inner_divider': True, + 'highlight_groups': ['continuation'], + }, + { + 'contents': 'then', + 'draw_inner_divider': True, + 'highlight_groups': ['continuation:current', 'continuation'], + 'width': 'auto', + 'align': 'l', + }, + ]) + + def test_cwd(self): + new_os = new_module('os', path=os.path, sep='/') + pl = Pl() + cwd = [None] + + def getcwd(): + wd = cwd[0] + if isinstance(wd, Exception): + raise wd + else: + return wd + + segment_info = {'getcwd': getcwd, 'home': None} + with replace_attr(shell, 'os', new_os): + cwd[0] = '/abc/def/ghi/foo/bar' + self.assertEqual(shell.cwd(pl=pl, segment_info=segment_info), [ + {'contents': '/', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'abc', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'def', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'ghi', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'foo', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']}, + ]) + segment_info['home'] = '/abc/def/ghi' + self.assertEqual(shell.cwd(pl=pl, segment_info=segment_info), [ + {'contents': '~', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'foo', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']}, + ]) + segment_info.update(shortened_path='~foo/ghi') + self.assertEqual(shell.cwd(pl=pl, segment_info=segment_info), [ + {'contents': '~foo', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'ghi', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']}, + ]) + self.assertEqual(shell.cwd(pl=pl, segment_info=segment_info, use_shortened_path=False), [ + {'contents': '~', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'foo', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']}, + ]) + segment_info.pop('shortened_path') + self.assertEqual(shell.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=3), [ + {'contents': '~', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'foo', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']} + ]) + self.assertEqual(shell.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=3, shorten_home=False), [ + {'contents': '...', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'ghi', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'foo', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']} + ]) + self.assertEqual(shell.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=1), [ + {'contents': '...', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']} + ]) + self.assertEqual(shell.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=1, ellipsis='---'), [ + {'contents': '---', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']} + ]) + self.assertEqual(shell.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=1, ellipsis=None), [ + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']} + ]) + self.assertEqual(shell.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=1, use_path_separator=True), [ + {'contents': '.../', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False, 'highlight_groups': ['cwd:current_folder', 'cwd']} + ]) + self.assertEqual(shell.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=1, use_path_separator=True, ellipsis='---'), [ + {'contents': '---/', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False, 'highlight_groups': ['cwd:current_folder', 'cwd']} + ]) + self.assertEqual(shell.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=1, use_path_separator=True, ellipsis=None), [ + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False, 'highlight_groups': ['cwd:current_folder', 'cwd']} + ]) + self.assertEqual(shell.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=2, dir_shorten_len=2), [ + {'contents': '~', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'fo', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']} + ]) + self.assertEqual(shell.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=2, dir_shorten_len=2, use_path_separator=True), [ + {'contents': '~/', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False}, + {'contents': 'fo/', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False, 'highlight_groups': ['cwd:current_folder', 'cwd']} + ]) + cwd[0] = '/etc' + self.assertEqual(shell.cwd(pl=pl, segment_info=segment_info, use_path_separator=False), [ + {'contents': '/', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'etc', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']}, + ]) + self.assertEqual(shell.cwd(pl=pl, segment_info=segment_info, use_path_separator=True), [ + {'contents': '/', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False}, + {'contents': 'etc', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False, 'highlight_groups': ['cwd:current_folder', 'cwd']}, + ]) + cwd[0] = '/' + self.assertEqual(shell.cwd(pl=pl, segment_info=segment_info, use_path_separator=False), [ + {'contents': '/', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']}, + ]) + self.assertEqual(shell.cwd(pl=pl, segment_info=segment_info, use_path_separator=True), [ + {'contents': '/', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False, 'highlight_groups': ['cwd:current_folder', 'cwd']}, + ]) + ose = OSError() + ose.errno = 2 + cwd[0] = ose + self.assertEqual(shell.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=2, dir_shorten_len=2), [ + {'contents': '[not found]', 'divider_highlight_group': 'cwd:divider', 'highlight_groups': ['cwd:current_folder', 'cwd'], 'draw_inner_divider': True} + ]) + cwd[0] = OSError() + self.assertRaises(OSError, shell.cwd, pl=pl, segment_info=segment_info, dir_limit_depth=2, dir_shorten_len=2) + cwd[0] = ValueError() + self.assertRaises(ValueError, shell.cwd, pl=pl, segment_info=segment_info, dir_limit_depth=2, dir_shorten_len=2) + + +class TestTmux(TestCase): + def test_attached_clients(self): + def get_tmux_output(pl, cmd, *args): + if cmd == 'list-panes': + return 'session_name\n' + elif cmd == 'list-clients': + return '/dev/pts/2: 0 [191x51 xterm-256color] (utf8)\n/dev/pts/3: 0 [191x51 xterm-256color] (utf8)' + + pl = Pl() + with replace_attr(tmux, 'get_tmux_output', get_tmux_output): + self.assertEqual(tmux.attached_clients(pl=pl), '2') + self.assertEqual(tmux.attached_clients(pl=pl, minimum=3), None) + + +class TestCommon(TestCase): + @classmethod + def setUpClass(cls): + module = __import__(str('powerline.segments.common.{0}'.format(cls.module_name))) + cls.module = getattr(module.segments.common, str(cls.module_name)) + + +class TestNet(TestCommon): + module_name = 'net' + + def test_hostname(self): + pl = Pl() + with replace_env('SSH_CLIENT', '192.168.0.12 40921 22') as segment_info: + with replace_module_module(self.module, 'socket', gethostname=lambda: 'abc'): + self.assertEqual(self.module.hostname(pl=pl, segment_info=segment_info), 'abc') + self.assertEqual(self.module.hostname(pl=pl, segment_info=segment_info, only_if_ssh=True), 'abc') + with replace_module_module(self.module, 'socket', gethostname=lambda: 'abc.mydomain'): + self.assertEqual(self.module.hostname(pl=pl, segment_info=segment_info), 'abc.mydomain') + self.assertEqual(self.module.hostname(pl=pl, segment_info=segment_info, exclude_domain=True), 'abc') + self.assertEqual(self.module.hostname(pl=pl, segment_info=segment_info, only_if_ssh=True), 'abc.mydomain') + self.assertEqual(self.module.hostname(pl=pl, segment_info=segment_info, only_if_ssh=True, exclude_domain=True), 'abc') + segment_info['environ'].pop('SSH_CLIENT') + with replace_module_module(self.module, 'socket', gethostname=lambda: 'abc'): + self.assertEqual(self.module.hostname(pl=pl, segment_info=segment_info), 'abc') + self.assertEqual(self.module.hostname(pl=pl, segment_info=segment_info, only_if_ssh=True), None) + with replace_module_module(self.module, 'socket', gethostname=lambda: 'abc.mydomain'): + self.assertEqual(self.module.hostname(pl=pl, segment_info=segment_info), 'abc.mydomain') + self.assertEqual(self.module.hostname(pl=pl, segment_info=segment_info, exclude_domain=True), 'abc') + self.assertEqual(self.module.hostname(pl=pl, segment_info=segment_info, only_if_ssh=True, exclude_domain=True), None) + + def test_external_ip(self): + pl = Pl() + with replace_attr(self.module, 'urllib_read', urllib_read): + self.assertEqual(self.module.external_ip(pl=pl), [{'contents': '127.0.0.1', 'divider_highlight_group': 'background:divider'}]) + + def test_internal_ip(self): + try: + import netifaces + except ImportError: + raise SkipTest('netifaces module is not available') + pl = Pl() + addr = { + 'enp2s0': { + netifaces.AF_INET: [{'addr': '192.168.100.200'}], + netifaces.AF_INET6: [{'addr': 'feff::5446:5eff:fe5a:7777%enp2s0'}] + }, + 'lo': { + netifaces.AF_INET: [{'addr': '127.0.0.1'}], + netifaces.AF_INET6: [{'addr': '::1'}] + }, + 'teredo': { + netifaces.AF_INET6: [{'addr': 'feff::5446:5eff:fe5a:7777'}] + }, + } + interfaces = ['lo', 'enp2s0', 'teredo'] + with replace_module_module( + self.module, 'netifaces', + interfaces=(lambda: interfaces), + ifaddresses=(lambda interface: addr[interface]), + AF_INET=netifaces.AF_INET, + AF_INET6=netifaces.AF_INET6, + ): + self.assertEqual(self.module.internal_ip(pl=pl), '192.168.100.200') + self.assertEqual(self.module.internal_ip(pl=pl, interface='auto'), '192.168.100.200') + self.assertEqual(self.module.internal_ip(pl=pl, interface='lo'), '127.0.0.1') + self.assertEqual(self.module.internal_ip(pl=pl, interface='teredo'), None) + self.assertEqual(self.module.internal_ip(pl=pl, ipv=4), '192.168.100.200') + self.assertEqual(self.module.internal_ip(pl=pl, interface='auto', ipv=4), '192.168.100.200') + self.assertEqual(self.module.internal_ip(pl=pl, interface='lo', ipv=4), '127.0.0.1') + self.assertEqual(self.module.internal_ip(pl=pl, interface='teredo', ipv=4), None) + self.assertEqual(self.module.internal_ip(pl=pl, ipv=6), 'feff::5446:5eff:fe5a:7777%enp2s0') + self.assertEqual(self.module.internal_ip(pl=pl, interface='auto', ipv=6), 'feff::5446:5eff:fe5a:7777%enp2s0') + self.assertEqual(self.module.internal_ip(pl=pl, interface='lo', ipv=6), '::1') + self.assertEqual(self.module.internal_ip(pl=pl, interface='teredo', ipv=6), 'feff::5446:5eff:fe5a:7777') + interfaces[1:2] = () + self.assertEqual(self.module.internal_ip(pl=pl, ipv=6), 'feff::5446:5eff:fe5a:7777') + interfaces[1:2] = () + self.assertEqual(self.module.internal_ip(pl=pl, ipv=6), '::1') + interfaces[:] = () + self.assertEqual(self.module.internal_ip(pl=pl, ipv=6), None) + + gateways = { + 'default': { + netifaces.AF_INET: ('192.168.100.1', 'enp2s0'), + netifaces.AF_INET6: ('feff::5446:5eff:fe5a:0001', 'enp2s0') + } + } + + with replace_module_module( + self.module, 'netifaces', + interfaces=(lambda: interfaces), + ifaddresses=(lambda interface: addr[interface]), + gateways=(lambda: gateways), + AF_INET=netifaces.AF_INET, + AF_INET6=netifaces.AF_INET6, + ): + # default gateway has specified address family + self.assertEqual(self.module.internal_ip(pl=pl, interface='default_gateway', ipv=4), '192.168.100.200') + self.assertEqual(self.module.internal_ip(pl=pl, interface='default_gateway', ipv=6), 'feff::5446:5eff:fe5a:7777%enp2s0') + # default gateway doesn't have specified address family + gateways['default'] = {} + self.assertEqual(self.module.internal_ip(pl=pl, interface='default_gateway', ipv=4), None) + self.assertEqual(self.module.internal_ip(pl=pl, interface='default_gateway', ipv=6), None) + + def test_network_load(self): + def gb(interface): + return None + + f = [gb] + + def _get_bytes(interface): + return f[0](interface) + + pl = Pl() + + with replace_attr(self.module, '_get_bytes', _get_bytes): + self.module.network_load.startup(pl=pl) + try: + self.assertEqual(self.module.network_load(pl=pl, interface='eth0'), None) + sleep(self.module.network_load.interval) + self.assertEqual(self.module.network_load(pl=pl, interface='eth0'), None) + while 'prev' not in self.module.network_load.interfaces.get('eth0', {}): + sleep(0.1) + self.assertEqual(self.module.network_load(pl=pl, interface='eth0'), None) + + l = [0, 0] + + def gb2(interface): + l[0] += 1200 + l[1] += 2400 + return tuple(l) + f[0] = gb2 + + while not self.module.network_load.interfaces.get('eth0', {}).get('prev', (None, None))[1]: + sleep(0.1) + self.assertEqual(self.module.network_load(pl=pl, interface='eth0'), [ + {'divider_highlight_group': 'network_load:divider', 'contents': 'DL 1 KiB/s', 'highlight_groups': ['network_load_recv', 'network_load']}, + {'divider_highlight_group': 'network_load:divider', 'contents': 'UL 2 KiB/s', 'highlight_groups': ['network_load_sent', 'network_load']}, + ]) + self.assertEqual(self.module.network_load(pl=pl, interface='eth0', recv_format='r {value}', sent_format='s {value}'), [ + {'divider_highlight_group': 'network_load:divider', 'contents': 'r 1 KiB/s', 'highlight_groups': ['network_load_recv', 'network_load']}, + {'divider_highlight_group': 'network_load:divider', 'contents': 's 2 KiB/s', 'highlight_groups': ['network_load_sent', 'network_load']}, + ]) + self.assertEqual(self.module.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', suffix='bps', interface='eth0'), [ + {'divider_highlight_group': 'network_load:divider', 'contents': 'r 1 Kibps', 'highlight_groups': ['network_load_recv', 'network_load']}, + {'divider_highlight_group': 'network_load:divider', 'contents': 's 2 Kibps', 'highlight_groups': ['network_load_sent', 'network_load']}, + ]) + self.assertEqual(self.module.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', si_prefix=True, interface='eth0'), [ + {'divider_highlight_group': 'network_load:divider', 'contents': 'r 1 kB/s', 'highlight_groups': ['network_load_recv', 'network_load']}, + {'divider_highlight_group': 'network_load:divider', 'contents': 's 2 kB/s', 'highlight_groups': ['network_load_sent', 'network_load']}, + ]) + self.assertEqual(self.module.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', recv_max=0, interface='eth0'), [ + {'divider_highlight_group': 'network_load:divider', 'contents': 'r 1 KiB/s', 'highlight_groups': ['network_load_recv_gradient', 'network_load_gradient', 'network_load_recv', 'network_load'], 'gradient_level': 100}, + {'divider_highlight_group': 'network_load:divider', 'contents': 's 2 KiB/s', 'highlight_groups': ['network_load_sent', 'network_load']}, + ]) + + class ApproxEqual(object): + def __eq__(self, i): + return abs(i - 50.0) < 1 + + self.assertEqual(self.module.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', sent_max=4800, interface='eth0'), [ + {'divider_highlight_group': 'network_load:divider', 'contents': 'r 1 KiB/s', 'highlight_groups': ['network_load_recv', 'network_load']}, + {'divider_highlight_group': 'network_load:divider', 'contents': 's 2 KiB/s', 'highlight_groups': ['network_load_sent_gradient', 'network_load_gradient', 'network_load_sent', 'network_load'], 'gradient_level': ApproxEqual()}, + ]) + finally: + self.module.network_load.shutdown() + + +class TestEnv(TestCommon): + module_name = 'env' + + def test_user(self): + new_os = new_module('os', getpid=lambda: 1) + + class Process(object): + def __init__(self, pid): + pass + + def username(self): + return 'def@DOMAIN.COM' + + if hasattr(self.module, 'psutil') and not callable(self.module.psutil.Process.username): + username = property(username) + + segment_info = {'environ': {}} + + def user(*args, **kwargs): + return self.module.user(pl=pl, segment_info=segment_info, *args, **kwargs) + + struct_passwd = namedtuple('struct_passwd', ('pw_name',)) + new_psutil = new_module('psutil', Process=Process) + new_pwd = new_module('pwd', getpwuid=lambda uid: struct_passwd(pw_name='def@DOMAIN.COM')) + new_getpass = new_module('getpass', getuser=lambda: 'def@DOMAIN.COM') + pl = Pl() + with replace_attr(self.module, 'pwd', new_pwd): + with replace_attr(self.module, 'getpass', new_getpass): + with replace_attr(self.module, 'os', new_os): + with replace_attr(self.module, 'psutil', new_psutil): + with replace_attr(self.module, '_geteuid', lambda: 5): + self.assertEqual(user(), [ + {'contents': 'def@DOMAIN.COM', 'highlight_groups': ['user']} + ]) + self.assertEqual(user(hide_user='abc'), [ + {'contents': 'def@DOMAIN.COM', 'highlight_groups': ['user']} + ]) + self.assertEqual(user(hide_domain=False), [ + {'contents': 'def@DOMAIN.COM', 'highlight_groups': ['user']} + ]) + self.assertEqual(user(hide_user='def@DOMAIN.COM'), None) + self.assertEqual(user(hide_domain=True), [ + {'contents': 'def', 'highlight_groups': ['user']} + ]) + with replace_attr(self.module, '_geteuid', lambda: 0): + self.assertEqual(user(), [ + {'contents': 'def', 'highlight_groups': ['superuser', 'user']} + ]) + + def test_cwd(self): + new_os = new_module('os', path=os.path, sep='/') + pl = Pl() + cwd = [None] + + def getcwd(): + wd = cwd[0] + if isinstance(wd, Exception): + raise wd + else: + return wd + + segment_info = {'getcwd': getcwd, 'home': None} + with replace_attr(self.module, 'os', new_os): + cwd[0] = '/abc/def/ghi/foo/bar' + self.assertEqual(self.module.cwd(pl=pl, segment_info=segment_info), [ + {'contents': '/', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'abc', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'def', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'ghi', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'foo', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']}, + ]) + segment_info['home'] = '/abc/def/ghi' + self.assertEqual(self.module.cwd(pl=pl, segment_info=segment_info), [ + {'contents': '~', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'foo', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']}, + ]) + self.assertEqual(self.module.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=3), [ + {'contents': '~', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'foo', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']} + ]) + self.assertEqual(self.module.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=3, shorten_home=False), [ + {'contents': '...', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'ghi', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'foo', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']} + ]) + self.assertEqual(self.module.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=1), [ + {'contents': '...', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']} + ]) + self.assertEqual(self.module.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=1, ellipsis='---'), [ + {'contents': '---', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']} + ]) + self.assertEqual(self.module.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=1, ellipsis=None), [ + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']} + ]) + self.assertEqual(self.module.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=1, use_path_separator=True), [ + {'contents': '.../', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False, 'highlight_groups': ['cwd:current_folder', 'cwd']} + ]) + self.assertEqual(self.module.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=1, use_path_separator=True, ellipsis='---'), [ + {'contents': '---/', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False, 'highlight_groups': ['cwd:current_folder', 'cwd']} + ]) + self.assertEqual(self.module.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=1, use_path_separator=True, ellipsis=None), [ + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False, 'highlight_groups': ['cwd:current_folder', 'cwd']} + ]) + self.assertEqual(self.module.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=2, dir_shorten_len=2), [ + {'contents': '~', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'fo', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']} + ]) + self.assertEqual(self.module.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=2, dir_shorten_len=2, use_path_separator=True), [ + {'contents': '~/', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False}, + {'contents': 'fo/', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False}, + {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False, 'highlight_groups': ['cwd:current_folder', 'cwd']} + ]) + cwd[0] = '/etc' + self.assertEqual(self.module.cwd(pl=pl, segment_info=segment_info, use_path_separator=False), [ + {'contents': '/', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True}, + {'contents': 'etc', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']}, + ]) + self.assertEqual(self.module.cwd(pl=pl, segment_info=segment_info, use_path_separator=True), [ + {'contents': '/', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False}, + {'contents': 'etc', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False, 'highlight_groups': ['cwd:current_folder', 'cwd']}, + ]) + cwd[0] = '/' + self.assertEqual(self.module.cwd(pl=pl, segment_info=segment_info, use_path_separator=False), [ + {'contents': '/', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': True, 'highlight_groups': ['cwd:current_folder', 'cwd']}, + ]) + self.assertEqual(self.module.cwd(pl=pl, segment_info=segment_info, use_path_separator=True), [ + {'contents': '/', 'divider_highlight_group': 'cwd:divider', 'draw_inner_divider': False, 'highlight_groups': ['cwd:current_folder', 'cwd']}, + ]) + ose = OSError() + ose.errno = 2 + cwd[0] = ose + self.assertEqual(self.module.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=2, dir_shorten_len=2), [ + {'contents': '[not found]', 'divider_highlight_group': 'cwd:divider', 'highlight_groups': ['cwd:current_folder', 'cwd'], 'draw_inner_divider': True} + ]) + cwd[0] = OSError() + self.assertRaises(OSError, self.module.cwd, pl=pl, segment_info=segment_info, dir_limit_depth=2, dir_shorten_len=2) + cwd[0] = ValueError() + self.assertRaises(ValueError, self.module.cwd, pl=pl, segment_info=segment_info, dir_limit_depth=2, dir_shorten_len=2) + + def test_virtualenv(self): + pl = Pl() + with replace_env('VIRTUAL_ENV', '/abc/def/ghi') as segment_info: + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info), 'ghi') + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info, ignore_conda=True), 'ghi') + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info, ignore_venv=True), None) + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info, ignore_venv=True, ignore_conda=True), None) + + segment_info['environ'].pop('VIRTUAL_ENV') + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info), None) + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info, ignore_conda=True), None) + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info, ignore_venv=True), None) + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info, ignore_venv=True, ignore_conda=True), None) + + with replace_env('CONDA_DEFAULT_ENV', 'foo') as segment_info: + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info), 'foo') + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info, ignore_conda=True), None) + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info, ignore_venv=True), 'foo') + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info, ignore_venv=True, ignore_conda=True), None) + + segment_info['environ'].pop('CONDA_DEFAULT_ENV') + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info), None) + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info, ignore_conda=True), None) + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info, ignore_venv=True), None) + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info, ignore_venv=True, ignore_conda=True), None) + + with replace_env('CONDA_DEFAULT_ENV', 'foo', environ={'VIRTUAL_ENV': '/sbc/def/ghi'}) as segment_info: + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info), 'ghi') + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info, ignore_conda=True), 'ghi') + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info, ignore_venv=True), 'foo') + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info, ignore_venv=True, ignore_conda=True), None) + + segment_info['environ'].pop('CONDA_DEFAULT_ENV') + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info), 'ghi') + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info, ignore_conda=True), 'ghi') + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info, ignore_venv=True), None) + self.assertEqual(self.module.virtualenv(pl=pl, segment_info=segment_info, ignore_venv=True, ignore_conda=True), None) + + def test_environment(self): + pl = Pl() + variable = 'FOO' + value = 'bar' + with replace_env(variable, value) as segment_info: + self.assertEqual(self.module.environment(pl=pl, segment_info=segment_info, variable=variable), value) + segment_info['environ'].pop(variable) + self.assertEqual(self.module.environment(pl=pl, segment_info=segment_info, variable=variable), None) + + +class TestVcs(TestCommon): + module_name = 'vcs' + + def test_branch(self): + pl = Pl() + create_watcher = get_fallback_create_watcher() + segment_info = {'getcwd': os.getcwd} + branch = partial(self.module.branch, pl=pl, create_watcher=create_watcher) + with replace_attr(self.module, 'guess', get_dummy_guess(status=lambda: None, directory='/tmp/tests')): + with replace_attr(self.module, 'tree_status', lambda repo, pl: None): + self.assertEqual(branch(segment_info=segment_info, status_colors=False), [{ + 'highlight_groups': ['branch'], + 'contents': 'tests', + 'divider_highlight_group': None + }]) + self.assertEqual(branch(segment_info=segment_info, status_colors=True), [{ + 'contents': 'tests', + 'highlight_groups': ['branch_clean', 'branch'], + 'divider_highlight_group': None + }]) + with replace_attr(self.module, 'guess', get_dummy_guess(status=lambda: 'D ', directory='/tmp/tests')): + with replace_attr(self.module, 'tree_status', lambda repo, pl: 'D '): + self.assertEqual(branch(segment_info=segment_info, status_colors=False), [{ + 'highlight_groups': ['branch'], + 'contents': 'tests', + 'divider_highlight_group': None + }]) + self.assertEqual(branch(segment_info=segment_info, status_colors=True), [{ + 'contents': 'tests', + 'highlight_groups': ['branch_dirty', 'branch'], + 'divider_highlight_group': None + }]) + self.assertEqual(branch(segment_info=segment_info, status_colors=False), [{ + 'highlight_groups': ['branch'], + 'contents': 'tests', + 'divider_highlight_group': None + }]) + with replace_attr(self.module, 'guess', lambda path, create_watcher: None): + self.assertEqual(branch(segment_info=segment_info, status_colors=False), None) + with replace_attr(self.module, 'guess', get_dummy_guess(status=lambda: 'U')): + with replace_attr(self.module, 'tree_status', lambda repo, pl: 'U'): + self.assertEqual(branch(segment_info=segment_info, status_colors=False, ignore_statuses=['U']), [{ + 'highlight_groups': ['branch'], + 'contents': 'tests', + 'divider_highlight_group': None + }]) + self.assertEqual(branch(segment_info=segment_info, status_colors=True, ignore_statuses=['DU']), [{ + 'highlight_groups': ['branch_dirty', 'branch'], + 'contents': 'tests', + 'divider_highlight_group': None + }]) + self.assertEqual(branch(segment_info=segment_info, status_colors=True), [{ + 'highlight_groups': ['branch_dirty', 'branch'], + 'contents': 'tests', + 'divider_highlight_group': None + }]) + self.assertEqual(branch(segment_info=segment_info, status_colors=True, ignore_statuses=['U']), [{ + 'highlight_groups': ['branch_clean', 'branch'], + 'contents': 'tests', + 'divider_highlight_group': None + }]) + + def test_stash(self): + pl = Pl() + create_watcher = get_fallback_create_watcher() + stash = partial(self.module.stash, pl=pl, create_watcher=create_watcher, segment_info={'getcwd': os.getcwd}) + + def forge_stash(n): + return replace_attr(self.module, 'guess', get_dummy_guess(stash=lambda: n, directory='/tmp/tests')) + + with forge_stash(0): + self.assertEqual(stash(), None) + with forge_stash(1): + self.assertEqual(stash(), [{ + 'highlight_groups': ['stash'], + 'contents': '1', + 'divider_highlight_group': None + }]) + with forge_stash(2): + self.assertEqual(stash(), [{ + 'highlight_groups': ['stash'], + 'contents': '2', + 'divider_highlight_group': None + }]) + + +class TestTime(TestCommon): + module_name = 'time' + + def test_date(self): + pl = Pl() + with replace_attr(self.module, 'datetime', Args(now=lambda: Args(strftime=lambda fmt: fmt))): + self.assertEqual(self.module.date(pl=pl), [{'contents': '%Y-%m-%d', 'highlight_groups': ['date'], 'divider_highlight_group': None}]) + self.assertEqual(self.module.date(pl=pl, format='%H:%M', istime=True), [{'contents': '%H:%M', 'highlight_groups': ['time', 'date'], 'divider_highlight_group': 'time:divider'}]) + unicode_date = self.module.date(pl=pl, format='\u231a', istime=True) + expected_unicode_date = [{'contents': '\u231a', 'highlight_groups': ['time', 'date'], 'divider_highlight_group': 'time:divider'}] + if python_implementation() == 'PyPy' and sys.version_info >= (3,): + if unicode_date != expected_unicode_date: + raise SkipTest('Dates do not match, see https://bitbucket.org/pypy/pypy/issues/2161/pypy3-strftime-does-not-accept-unicode') + self.assertEqual(unicode_date, expected_unicode_date) + + def test_fuzzy_time(self): + time = Args(hour=0, minute=45) + pl = Pl() + with replace_attr(self.module, 'datetime', Args(now=lambda: time)): + self.assertEqual(self.module.fuzzy_time(pl=pl), 'quarter to one') + time.hour = 23 + time.minute = 59 + self.assertEqual(self.module.fuzzy_time(pl=pl), 'round about midnight') + time.minute = 33 + self.assertEqual(self.module.fuzzy_time(pl=pl), 'twenty-five to twelve') + time.minute = 60 + self.assertEqual(self.module.fuzzy_time(pl=pl), 'twelve o\'clock') + time.minute = 33 + self.assertEqual(self.module.fuzzy_time(pl=pl, unicode_text=False), 'twenty-five to twelve') + time.minute = 60 + self.assertEqual(self.module.fuzzy_time(pl=pl, unicode_text=False), 'twelve o\'clock') + time.minute = 33 + self.assertEqual(self.module.fuzzy_time(pl=pl, unicode_text=True), 'twenty‐five to twelve') + time.minute = 60 + self.assertEqual(self.module.fuzzy_time(pl=pl, unicode_text=True), 'twelve o’clock') + + +class TestSys(TestCommon): + module_name = 'sys' + + def test_uptime(self): + pl = Pl() + with replace_attr(self.module, '_get_uptime', lambda: 259200): + self.assertEqual(self.module.uptime(pl=pl), [{'contents': '3d', 'divider_highlight_group': 'background:divider'}]) + with replace_attr(self.module, '_get_uptime', lambda: 93784): + self.assertEqual(self.module.uptime(pl=pl), [{'contents': '1d 2h 3m', 'divider_highlight_group': 'background:divider'}]) + self.assertEqual(self.module.uptime(pl=pl, shorten_len=4), [{'contents': '1d 2h 3m 4s', 'divider_highlight_group': 'background:divider'}]) + with replace_attr(self.module, '_get_uptime', lambda: 65536): + self.assertEqual(self.module.uptime(pl=pl), [{'contents': '18h 12m 16s', 'divider_highlight_group': 'background:divider'}]) + self.assertEqual(self.module.uptime(pl=pl, shorten_len=2), [{'contents': '18h 12m', 'divider_highlight_group': 'background:divider'}]) + self.assertEqual(self.module.uptime(pl=pl, shorten_len=1), [{'contents': '18h', 'divider_highlight_group': 'background:divider'}]) + + def _get_uptime(): + raise NotImplementedError + + with replace_attr(self.module, '_get_uptime', _get_uptime): + self.assertEqual(self.module.uptime(pl=pl), None) + + def test_system_load(self): + pl = Pl() + with replace_module_module(self.module, 'os', getloadavg=lambda: (7.5, 3.5, 1.5)): + with replace_attr(self.module, '_cpu_count', lambda: 2): + self.assertEqual(self.module.system_load(pl=pl), [ + {'contents': '7.5 ', 'highlight_groups': ['system_load_gradient', 'system_load'], 'divider_highlight_group': 'background:divider', 'gradient_level': 100}, + {'contents': '3.5 ', 'highlight_groups': ['system_load_gradient', 'system_load'], 'divider_highlight_group': 'background:divider', 'gradient_level': 75.0}, + {'contents': '1.5', 'highlight_groups': ['system_load_gradient', 'system_load'], 'divider_highlight_group': 'background:divider', 'gradient_level': 0} + ]) + self.assertEqual(self.module.system_load(pl=pl, format='{avg:.0f}', threshold_good=0, threshold_bad=1), [ + {'contents': '8 ', 'highlight_groups': ['system_load_gradient', 'system_load'], 'divider_highlight_group': 'background:divider', 'gradient_level': 100}, + {'contents': '4 ', 'highlight_groups': ['system_load_gradient', 'system_load'], 'divider_highlight_group': 'background:divider', 'gradient_level': 100}, + {'contents': '2', 'highlight_groups': ['system_load_gradient', 'system_load'], 'divider_highlight_group': 'background:divider', 'gradient_level': 75.0} + ]) + self.assertEqual(self.module.system_load(pl=pl, short=True), [ + {'contents': '7.5', 'highlight_groups': ['system_load_gradient', 'system_load'], 'divider_highlight_group': 'background:divider', 'gradient_level': 100}, + ]) + self.assertEqual(self.module.system_load(pl=pl, format='{avg:.0f}', threshold_good=0, threshold_bad=1, short=True), [ + {'contents': '8', 'highlight_groups': ['system_load_gradient', 'system_load'], 'divider_highlight_group': 'background:divider', 'gradient_level': 100}, + ]) + + def test_cpu_load_percent(self): + try: + __import__('psutil') + except ImportError as e: + raise SkipTest('Failed to import psutil: {0}'.format(e)) + pl = Pl() + with replace_module_module(self.module, 'psutil', cpu_percent=lambda **kwargs: 52.3): + self.assertEqual(self.module.cpu_load_percent(pl=pl), [{ + 'contents': '52%', + 'gradient_level': 52.3, + 'highlight_groups': ['cpu_load_percent_gradient', 'cpu_load_percent'], + }]) + self.assertEqual(self.module.cpu_load_percent(pl=pl, format='{0:.1f}%'), [{ + 'contents': '52.3%', + 'gradient_level': 52.3, + 'highlight_groups': ['cpu_load_percent_gradient', 'cpu_load_percent'], + }]) + + +class TestWthr(TestCommon): + module_name = 'wthr' + + def test_weather(self): + pl = Pl() + with replace_attr(self.module, 'urllib_read', urllib_read): + self.assertEqual(self.module.weather(pl=pl), [ + {'divider_highlight_group': 'background:divider', 'highlight_groups': ['weather_condition_blustery', 'weather_condition_windy', 'weather_conditions', 'weather'], 'contents': 'WINDY '}, + {'divider_highlight_group': 'background:divider', 'highlight_groups': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '14°C', 'gradient_level': 62.857142857142854} + ]) + self.assertEqual(self.module.weather(pl=pl, temp_coldest=0, temp_hottest=100), [ + {'divider_highlight_group': 'background:divider', 'highlight_groups': ['weather_condition_blustery', 'weather_condition_windy', 'weather_conditions', 'weather'], 'contents': 'WINDY '}, + {'divider_highlight_group': 'background:divider', 'highlight_groups': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '14°C', 'gradient_level': 14.0} + ]) + self.assertEqual(self.module.weather(pl=pl, temp_coldest=-100, temp_hottest=-50), [ + {'divider_highlight_group': 'background:divider', 'highlight_groups': ['weather_condition_blustery', 'weather_condition_windy', 'weather_conditions', 'weather'], 'contents': 'WINDY '}, + {'divider_highlight_group': 'background:divider', 'highlight_groups': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '14°C', 'gradient_level': 100} + ]) + self.assertEqual(self.module.weather(pl=pl, icons={'blustery': 'o'}), [ + {'divider_highlight_group': 'background:divider', 'highlight_groups': ['weather_condition_blustery', 'weather_condition_windy', 'weather_conditions', 'weather'], 'contents': 'o '}, + {'divider_highlight_group': 'background:divider', 'highlight_groups': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '14°C', 'gradient_level': 62.857142857142854} + ]) + self.assertEqual(self.module.weather(pl=pl, icons={'windy': 'x'}), [ + {'divider_highlight_group': 'background:divider', 'highlight_groups': ['weather_condition_blustery', 'weather_condition_windy', 'weather_conditions', 'weather'], 'contents': 'x '}, + {'divider_highlight_group': 'background:divider', 'highlight_groups': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '14°C', 'gradient_level': 62.857142857142854} + ]) + self.assertEqual(self.module.weather(pl=pl, unit='F'), [ + {'divider_highlight_group': 'background:divider', 'highlight_groups': ['weather_condition_blustery', 'weather_condition_windy', 'weather_conditions', 'weather'], 'contents': 'WINDY '}, + {'divider_highlight_group': 'background:divider', 'highlight_groups': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '57°F', 'gradient_level': 62.857142857142854} + ]) + self.assertEqual(self.module.weather(pl=pl, unit='K'), [ + {'divider_highlight_group': 'background:divider', 'highlight_groups': ['weather_condition_blustery', 'weather_condition_windy', 'weather_conditions', 'weather'], 'contents': 'WINDY '}, + {'divider_highlight_group': 'background:divider', 'highlight_groups': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '287K', 'gradient_level': 62.857142857142854} + ]) + self.assertEqual(self.module.weather(pl=pl, temp_format='{temp:.1e}C'), [ + {'divider_highlight_group': 'background:divider', 'highlight_groups': ['weather_condition_blustery', 'weather_condition_windy', 'weather_conditions', 'weather'], 'contents': 'WINDY '}, + {'divider_highlight_group': 'background:divider', 'highlight_groups': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '1.4e+01C', 'gradient_level': 62.857142857142854} + ]) + with replace_attr(self.module, 'urllib_read', urllib_read): + self.module.weather.startup(pl=pl, location_query='Meppen,06,DE') + self.assertEqual(self.module.weather(pl=pl), [ + {'divider_highlight_group': 'background:divider', 'highlight_groups': ['weather_condition_blustery', 'weather_condition_windy', 'weather_conditions', 'weather'], 'contents': 'WINDY '}, + {'divider_highlight_group': 'background:divider', 'highlight_groups': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '14°C', 'gradient_level': 62.857142857142854} + ]) + self.assertEqual(self.module.weather(pl=pl, location_query='Moscow,RU'), [ + {'divider_highlight_group': 'background:divider', 'highlight_groups': ['weather_condition_fair_night', 'weather_condition_night', 'weather_conditions', 'weather'], 'contents': 'NIGHT '}, + {'divider_highlight_group': 'background:divider', 'highlight_groups': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '9°C', 'gradient_level': 55.714285714285715} + ]) + self.module.weather.shutdown() + + +class TestI3WM(TestCase): + @staticmethod + def get_workspaces(): + return iter([ + {'name': '1: w1', 'output': 'LVDS1', 'focused': False, 'urgent': False, 'visible': False}, + {'name': '2: w2', 'output': 'LVDS1', 'focused': False, 'urgent': False, 'visible': True}, + {'name': '3: w3', 'output': 'HDMI1', 'focused': False, 'urgent': True, 'visible': True}, + {'name': '4: w4', 'output': 'DVI01', 'focused': True, 'urgent': True, 'visible': True}, + ]) + + def test_workspaces(self): + pl = Pl() + with replace_attr(i3wm, 'get_i3_connection', lambda: Args(get_workspaces=self.get_workspaces)): + segment_info = {} + + self.assertEqual(i3wm.workspaces(pl=pl, segment_info=segment_info), [ + {'contents': '1: w1', 'highlight_groups': ['workspace']}, + {'contents': '2: w2', 'highlight_groups': ['w_visible', 'workspace']}, + {'contents': '3: w3', 'highlight_groups': ['w_urgent', 'w_visible', 'workspace']}, + {'contents': '4: w4', 'highlight_groups': ['w_focused', 'w_urgent', 'w_visible', 'workspace']}, + ]) + self.assertEqual(i3wm.workspaces(pl=pl, segment_info=segment_info, only_show=None), [ + {'contents': '1: w1', 'highlight_groups': ['workspace']}, + {'contents': '2: w2', 'highlight_groups': ['w_visible', 'workspace']}, + {'contents': '3: w3', 'highlight_groups': ['w_urgent', 'w_visible', 'workspace']}, + {'contents': '4: w4', 'highlight_groups': ['w_focused', 'w_urgent', 'w_visible', 'workspace']}, + ]) + self.assertEqual(i3wm.workspaces(pl=pl, segment_info=segment_info, only_show=['focused', 'urgent']), [ + {'contents': '3: w3', 'highlight_groups': ['w_urgent', 'w_visible', 'workspace']}, + {'contents': '4: w4', 'highlight_groups': ['w_focused', 'w_urgent', 'w_visible', 'workspace']}, + ]) + self.assertEqual(i3wm.workspaces(pl=pl, segment_info=segment_info, only_show=['visible']), [ + {'contents': '2: w2', 'highlight_groups': ['w_visible', 'workspace']}, + {'contents': '3: w3', 'highlight_groups': ['w_urgent', 'w_visible', 'workspace']}, + {'contents': '4: w4', 'highlight_groups': ['w_focused', 'w_urgent', 'w_visible', 'workspace']}, + ]) + self.assertEqual(i3wm.workspaces(pl=pl, segment_info=segment_info, only_show=['visible'], strip=3), [ + {'contents': 'w2', 'highlight_groups': ['w_visible', 'workspace']}, + {'contents': 'w3', 'highlight_groups': ['w_urgent', 'w_visible', 'workspace']}, + {'contents': 'w4', 'highlight_groups': ['w_focused', 'w_urgent', 'w_visible', 'workspace']}, + ]) + self.assertEqual(i3wm.workspaces(pl=pl, segment_info=segment_info, only_show=['focused', 'urgent'], output='DVI01'), [ + {'contents': '4: w4', 'highlight_groups': ['w_focused', 'w_urgent', 'w_visible', 'workspace']}, + ]) + self.assertEqual(i3wm.workspaces(pl=pl, segment_info=segment_info, only_show=['visible'], output='HDMI1'), [ + {'contents': '3: w3', 'highlight_groups': ['w_urgent', 'w_visible', 'workspace']}, + ]) + self.assertEqual(i3wm.workspaces(pl=pl, segment_info=segment_info, only_show=['visible'], strip=3, output='LVDS1'), [ + {'contents': 'w2', 'highlight_groups': ['w_visible', 'workspace']}, + ]) + segment_info['output'] = 'LVDS1' + self.assertEqual(i3wm.workspaces(pl=pl, segment_info=segment_info, only_show=['visible'], output='HDMI1'), [ + {'contents': '3: w3', 'highlight_groups': ['w_urgent', 'w_visible', 'workspace']}, + ]) + self.assertEqual(i3wm.workspaces(pl=pl, segment_info=segment_info, only_show=['visible'], strip=3), [ + {'contents': 'w2', 'highlight_groups': ['w_visible', 'workspace']}, + ]) + + def test_workspace(self): + pl = Pl() + with replace_attr(i3wm, 'get_i3_connection', lambda: Args(get_workspaces=self.get_workspaces)): + segment_info = {} + + self.assertEqual(i3wm.workspace(pl=pl, segment_info=segment_info, workspace='1: w1'), [ + {'contents': '1: w1', 'highlight_groups': ['workspace']}, + ]) + self.assertEqual(i3wm.workspace(pl=pl, segment_info=segment_info, workspace='3: w3', strip=True), [ + {'contents': 'w3', 'highlight_groups': ['w_urgent', 'w_visible', 'workspace']}, + ]) + self.assertEqual(i3wm.workspace(pl=pl, segment_info=segment_info, workspace='9: w9'), None) + self.assertEqual(i3wm.workspace(pl=pl, segment_info=segment_info), [ + {'contents': '4: w4', 'highlight_groups': ['w_focused', 'w_urgent', 'w_visible', 'workspace']}, + ]) + segment_info['workspace'] = next(self.get_workspaces()) + self.assertEqual(i3wm.workspace(pl=pl, segment_info=segment_info, workspace='4: w4'), [ + {'contents': '4: w4', 'highlight_groups': ['w_focused', 'w_urgent', 'w_visible', 'workspace']}, + ]) + self.assertEqual(i3wm.workspace(pl=pl, segment_info=segment_info, strip=True), [ + {'contents': 'w1', 'highlight_groups': ['workspace']}, + ]) + + def test_mode(self): + pl = Pl() + self.assertEqual(i3wm.mode(pl=pl, segment_info={'mode': 'default'}), None) + self.assertEqual(i3wm.mode(pl=pl, segment_info={'mode': 'test'}), 'test') + self.assertEqual(i3wm.mode(pl=pl, segment_info={'mode': 'default'}, names={'default': 'test'}), 'test') + self.assertEqual(i3wm.mode(pl=pl, segment_info={'mode': 'test'}, names={'default': 'test', 'test': 't'}), 't') + + def test_scratchpad(self): + class Conn(object): + def get_tree(self): + return self + + def descendents(self): + nodes_unfocused = [Args(focused = False)] + nodes_focused = [Args(focused = True)] + + workspace_scratch = lambda: Args(name='__i3_scratch') + workspace_noscratch = lambda: Args(name='2: www') + return [ + Args(scratchpad_state='fresh', urgent=False, workspace=workspace_scratch, nodes=nodes_unfocused), + Args(scratchpad_state='changed', urgent=True, workspace=workspace_noscratch, nodes=nodes_focused), + Args(scratchpad_state='fresh', urgent=False, workspace=workspace_scratch, nodes=nodes_unfocused), + Args(scratchpad_state=None, urgent=False, workspace=workspace_noscratch, nodes=nodes_unfocused), + Args(scratchpad_state='fresh', urgent=False, workspace=workspace_scratch, nodes=nodes_focused), + Args(scratchpad_state=None, urgent=True, workspace=workspace_noscratch, nodes=nodes_unfocused), + ] + + pl = Pl() + with replace_attr(i3wm, 'get_i3_connection', lambda: Conn()): + self.assertEqual(i3wm.scratchpad(pl=pl), [ + {'contents': 'O', 'highlight_groups': ['scratchpad']}, + {'contents': 'X', 'highlight_groups': ['scratchpad:urgent', 'scratchpad:focused', 'scratchpad:visible', 'scratchpad']}, + {'contents': 'O', 'highlight_groups': ['scratchpad']}, + {'contents': 'X', 'highlight_groups': ['scratchpad:visible', 'scratchpad']}, + {'contents': 'O', 'highlight_groups': ['scratchpad:focused', 'scratchpad']}, + {'contents': 'X', 'highlight_groups': ['scratchpad:urgent', 'scratchpad:visible', 'scratchpad']}, + ]) + self.assertEqual(i3wm.scratchpad(pl=pl, icons={'changed': '-', 'fresh': 'o'}), [ + {'contents': 'o', 'highlight_groups': ['scratchpad']}, + {'contents': '-', 'highlight_groups': ['scratchpad:urgent', 'scratchpad:focused', 'scratchpad:visible', 'scratchpad']}, + {'contents': 'o', 'highlight_groups': ['scratchpad']}, + {'contents': '-', 'highlight_groups': ['scratchpad:visible', 'scratchpad']}, + {'contents': 'o', 'highlight_groups': ['scratchpad:focused', 'scratchpad']}, + {'contents': '-', 'highlight_groups': ['scratchpad:urgent', 'scratchpad:visible', 'scratchpad']}, + ]) + + +class TestMail(TestCommon): + module_name = 'mail' + + def test_email_imap_alert(self): + # TODO + pass + + +class TestPlayers(TestCommon): + module_name = 'players' + + def test_now_playing(self): + # TODO + pass + + +class TestBat(TestCommon): + module_name = 'bat' + + def test_battery(self): + pl = Pl() + + def _get_battery_status(pl): + return 86, False + + with replace_attr(self.module, '_get_battery_status', _get_battery_status): + self.assertEqual(self.module.battery(pl=pl), [{ + 'contents': ' 86%', + 'highlight_groups': ['battery_gradient', 'battery'], + 'gradient_level': 14, + }]) + self.assertEqual(self.module.battery(pl=pl, format='{capacity:.2f}'), [{ + 'contents': '0.86', + 'highlight_groups': ['battery_gradient', 'battery'], + 'gradient_level': 14, + }]) + self.assertEqual(self.module.battery(pl=pl, steps=7), [{ + 'contents': ' 86%', + 'highlight_groups': ['battery_gradient', 'battery'], + 'gradient_level': 14, + }]) + self.assertEqual(self.module.battery(pl=pl, gamify=True), [ + { + 'contents': ' ', + 'draw_inner_divider': False, + 'highlight_groups': ['battery_offline', 'battery_ac_state', 'battery_gradient', 'battery'], + 'gradient_level': 0 + }, + { + 'contents': 'OOOO', + 'draw_inner_divider': False, + 'highlight_groups': ['battery_full', 'battery_gradient', 'battery'], + 'gradient_level': 0 + }, + { + 'contents': 'O', + 'draw_inner_divider': False, + 'highlight_groups': ['battery_empty', 'battery_gradient', 'battery'], + 'gradient_level': 100 + } + ]) + self.assertEqual(self.module.battery(pl=pl, gamify=True, full_heart='+', empty_heart='-', steps='10'), [ + { + 'contents': ' ', + 'draw_inner_divider': False, + 'highlight_groups': ['battery_offline', 'battery_ac_state', 'battery_gradient', 'battery'], + 'gradient_level': 0 + }, + { + 'contents': '++++++++', + 'draw_inner_divider': False, + 'highlight_groups': ['battery_full', 'battery_gradient', 'battery'], + 'gradient_level': 0 + }, + { + 'contents': '--', + 'draw_inner_divider': False, + 'highlight_groups': ['battery_empty', 'battery_gradient', 'battery'], + 'gradient_level': 100 + } + ]) + + def test_battery_with_ac_online(self): + pl = Pl() + + def _get_battery_status(pl): + return 86, True + + with replace_attr(self.module, '_get_battery_status', _get_battery_status): + self.assertEqual(self.module.battery(pl=pl, online='C', offline=' '), [ + { + 'contents': 'C 86%', + 'highlight_groups': ['battery_gradient', 'battery'], + 'gradient_level': 14, + }]) + + def test_battery_with_ac_offline(self): + pl = Pl() + + def _get_battery_status(pl): + return 86, False + + with replace_attr(self.module, '_get_battery_status', _get_battery_status): + self.assertEqual(self.module.battery(pl=pl, online='C', offline=' '), [ + { + 'contents': ' 86%', + 'highlight_groups': ['battery_gradient', 'battery'], + 'gradient_level': 14, + }]) + + +class TestVim(TestCase): + def test_mode(self): + pl = Pl() + segment_info = vim_module._get_segment_info() + self.assertEqual(self.vim.mode(pl=pl, segment_info=segment_info), 'NORMAL') + self.assertEqual(self.vim.mode(pl=pl, segment_info=segment_info, override={'i': 'INS'}), 'NORMAL') + self.assertEqual(self.vim.mode(pl=pl, segment_info=segment_info, override={'n': 'NORM'}), 'NORM') + with vim_module._with('mode', 'i') as segment_info: + self.assertEqual(self.vim.mode(pl=pl, segment_info=segment_info), 'INSERT') + with vim_module._with('mode', 'i\0') as segment_info: + self.assertEqual(self.vim.mode(pl=pl, segment_info=segment_info), 'INSERT') + with vim_module._with('mode', chr(ord('V') - 0x40)) as segment_info: + self.assertEqual(self.vim.mode(pl=pl, segment_info=segment_info), 'V-BLCK') + self.assertEqual(self.vim.mode(pl=pl, segment_info=segment_info, override={'^V': 'VBLK'}), 'VBLK') + + def test_visual_range(self): + pl = Pl() + vr = partial(self.vim.visual_range, pl=pl) + vim_module.current.window.cursor = [0, 0] + try: + with vim_module._with('mode', 'i') as segment_info: + self.assertEqual(vr(segment_info=segment_info), '') + with vim_module._with('mode', '^V') as segment_info: + self.assertEqual(vr(segment_info=segment_info), '1 x 1') + with vim_module._with('vpos', line=5, col=5, off=0): + self.assertEqual(vr(segment_info=segment_info), '5 x 5') + with vim_module._with('vpos', line=5, col=4, off=0): + self.assertEqual(vr(segment_info=segment_info), '5 x 4') + with vim_module._with('mode', '^S') as segment_info: + self.assertEqual(vr(segment_info=segment_info), '1 x 1') + with vim_module._with('vpos', line=5, col=5, off=0): + self.assertEqual(vr(segment_info=segment_info), '5 x 5') + with vim_module._with('vpos', line=5, col=4, off=0): + self.assertEqual(vr(segment_info=segment_info), '5 x 4') + with vim_module._with('mode', 'V') as segment_info: + self.assertEqual(vr(segment_info=segment_info), 'L:1') + with vim_module._with('vpos', line=5, col=5, off=0): + self.assertEqual(vr(segment_info=segment_info), 'L:5') + with vim_module._with('vpos', line=5, col=4, off=0): + self.assertEqual(vr(segment_info=segment_info), 'L:5') + with vim_module._with('mode', 'S') as segment_info: + self.assertEqual(vr(segment_info=segment_info), 'L:1') + with vim_module._with('vpos', line=5, col=5, off=0): + self.assertEqual(vr(segment_info=segment_info), 'L:5') + with vim_module._with('vpos', line=5, col=4, off=0): + self.assertEqual(vr(segment_info=segment_info), 'L:5') + with vim_module._with('mode', 'v') as segment_info: + self.assertEqual(vr(segment_info=segment_info), 'C:1') + with vim_module._with('vpos', line=5, col=5, off=0): + self.assertEqual(vr(segment_info=segment_info), 'L:5') + with vim_module._with('vpos', line=5, col=4, off=0): + self.assertEqual(vr(segment_info=segment_info), 'L:5') + with vim_module._with('mode', 's') as segment_info: + self.assertEqual(vr(segment_info=segment_info), 'C:1') + with vim_module._with('vpos', line=5, col=5, off=0): + self.assertEqual(vr(segment_info=segment_info), 'L:5') + with vim_module._with('vpos', line=5, col=4, off=0): + self.assertEqual(vr(segment_info=segment_info), 'L:5') + finally: + vim_module._close(1) + + def test_modified_indicator(self): + pl = Pl() + segment_info = vim_module._get_segment_info() + self.assertEqual(self.vim.modified_indicator(pl=pl, segment_info=segment_info), None) + segment_info['buffer'][0] = 'abc' + try: + self.assertEqual(self.vim.modified_indicator(pl=pl, segment_info=segment_info), '+') + self.assertEqual(self.vim.modified_indicator(pl=pl, segment_info=segment_info, text='-'), '-') + finally: + vim_module._bw(segment_info['bufnr']) + + def test_paste_indicator(self): + pl = Pl() + segment_info = vim_module._get_segment_info() + self.assertEqual(self.vim.paste_indicator(pl=pl, segment_info=segment_info), None) + with vim_module._with('options', paste=1): + self.assertEqual(self.vim.paste_indicator(pl=pl, segment_info=segment_info), 'PASTE') + self.assertEqual(self.vim.paste_indicator(pl=pl, segment_info=segment_info, text='P'), 'P') + + def test_readonly_indicator(self): + pl = Pl() + segment_info = vim_module._get_segment_info() + self.assertEqual(self.vim.readonly_indicator(pl=pl, segment_info=segment_info), None) + with vim_module._with('bufoptions', readonly=1): + self.assertEqual(self.vim.readonly_indicator(pl=pl, segment_info=segment_info), 'RO') + self.assertEqual(self.vim.readonly_indicator(pl=pl, segment_info=segment_info, text='L'), 'L') + + def test_file_scheme(self): + pl = Pl() + segment_info = vim_module._get_segment_info() + self.assertEqual(self.vim.file_scheme(pl=pl, segment_info=segment_info), None) + with vim_module._with('buffer', '/tmp/’’/abc') as segment_info: + self.assertEqual(self.vim.file_scheme(pl=pl, segment_info=segment_info), None) + with vim_module._with('buffer', 'zipfile:/tmp/abc.zip::abc/abc.vim') as segment_info: + self.assertEqual(self.vim.file_scheme(pl=pl, segment_info=segment_info), 'zipfile') + + def test_file_directory(self): + pl = Pl() + segment_info = vim_module._get_segment_info() + self.assertEqual(self.vim.file_directory(pl=pl, segment_info=segment_info), None) + with replace_env('HOME', '/home/foo', os.environ): + with vim_module._with('buffer', '/tmp/’’/abc') as segment_info: + self.assertEqual(self.vim.file_directory(pl=pl, segment_info=segment_info), '/tmp/’’/') + with vim_module._with('buffer', b'/tmp/\xFF\xFF/abc') as segment_info: + self.assertEqual(self.vim.file_directory(pl=pl, segment_info=segment_info), '/tmp/<ff><ff>/') + with vim_module._with('buffer', '/tmp/abc') as segment_info: + self.assertEqual(self.vim.file_directory(pl=pl, segment_info=segment_info), '/tmp/') + os.environ['HOME'] = '/tmp' + self.assertEqual(self.vim.file_directory(pl=pl, segment_info=segment_info), '~/') + with vim_module._with('buffer', 'zipfile:/tmp/abc.zip::abc/abc.vim') as segment_info: + self.assertEqual(self.vim.file_directory(pl=pl, segment_info=segment_info, remove_scheme=False), 'zipfile:/tmp/abc.zip::abc/') + self.assertEqual(self.vim.file_directory(pl=pl, segment_info=segment_info, remove_scheme=True), '/tmp/abc.zip::abc/') + self.assertEqual(self.vim.file_directory(pl=pl, segment_info=segment_info), '/tmp/abc.zip::abc/') + os.environ['HOME'] = '/tmp' + self.assertEqual(self.vim.file_directory(pl=pl, segment_info=segment_info, remove_scheme=False), 'zipfile:/tmp/abc.zip::abc/') + self.assertEqual(self.vim.file_directory(pl=pl, segment_info=segment_info, remove_scheme=True), '/tmp/abc.zip::abc/') + self.assertEqual(self.vim.file_directory(pl=pl, segment_info=segment_info), '/tmp/abc.zip::abc/') + + def test_file_name(self): + pl = Pl() + segment_info = vim_module._get_segment_info() + self.assertEqual(self.vim.file_name(pl=pl, segment_info=segment_info), None) + self.assertEqual(self.vim.file_name(pl=pl, segment_info=segment_info, display_no_file=True), [ + {'contents': '[No file]', 'highlight_groups': ['file_name_no_file', 'file_name']} + ]) + self.assertEqual(self.vim.file_name(pl=pl, segment_info=segment_info, display_no_file=True, no_file_text='X'), [ + {'contents': 'X', 'highlight_groups': ['file_name_no_file', 'file_name']} + ]) + with vim_module._with('buffer', '/tmp/abc') as segment_info: + self.assertEqual(self.vim.file_name(pl=pl, segment_info=segment_info), 'abc') + with vim_module._with('buffer', '/tmp/’’') as segment_info: + self.assertEqual(self.vim.file_name(pl=pl, segment_info=segment_info), '’’') + with vim_module._with('buffer', b'/tmp/\xFF\xFF') as segment_info: + self.assertEqual(self.vim.file_name(pl=pl, segment_info=segment_info), '<ff><ff>') + + def test_file_size(self): + pl = Pl() + segment_info = vim_module._get_segment_info() + self.assertEqual(self.vim.file_size(pl=pl, segment_info=segment_info), '0 B') + with vim_module._with( + 'buffer', + os.path.join( + os.path.dirname(os.path.dirname(__file__)), 'empty') + ) as segment_info: + self.assertEqual(self.vim.file_size(pl=pl, segment_info=segment_info), '0 B') + + def test_file_opts(self): + pl = Pl() + segment_info = vim_module._get_segment_info() + self.assertEqual(self.vim.file_format(pl=pl, segment_info=segment_info), [ + {'divider_highlight_group': 'background:divider', 'contents': 'unix'} + ]) + self.assertEqual(self.vim.file_encoding(pl=pl, segment_info=segment_info), [ + {'divider_highlight_group': 'background:divider', 'contents': 'utf-8'} + ]) + self.assertEqual(self.vim.file_type(pl=pl, segment_info=segment_info), None) + with vim_module._with('bufoptions', filetype='python'): + self.assertEqual(self.vim.file_type(pl=pl, segment_info=segment_info), [ + {'divider_highlight_group': 'background:divider', 'contents': 'python'} + ]) + + def test_window_title(self): + pl = Pl() + segment_info = vim_module._get_segment_info() + self.assertEqual(self.vim.window_title(pl=pl, segment_info=segment_info), None) + with vim_module._with('wvars', quickfix_title='Abc'): + self.assertEqual(self.vim.window_title(pl=pl, segment_info=segment_info), 'Abc') + + def test_line_percent(self): + pl = Pl() + segment_info = vim_module._get_segment_info() + segment_info['buffer'][0:-1] = [str(i) for i in range(100)] + try: + self.assertEqual(self.vim.line_percent(pl=pl, segment_info=segment_info), '1') + vim_module._set_cursor(50, 0) + self.assertEqual(self.vim.line_percent(pl=pl, segment_info=segment_info), '50') + self.assertEqual(self.vim.line_percent(pl=pl, segment_info=segment_info, gradient=True), [ + {'contents': '50', 'highlight_groups': ['line_percent_gradient', 'line_percent'], 'gradient_level': 50 * 100.0 / 101} + ]) + finally: + vim_module._bw(segment_info['bufnr']) + + def test_line_count(self): + pl = Pl() + segment_info = vim_module._get_segment_info() + segment_info['buffer'][0:-1] = [str(i) for i in range(99)] + try: + self.assertEqual(self.vim.line_count(pl=pl, segment_info=segment_info), '100') + vim_module._set_cursor(50, 0) + self.assertEqual(self.vim.line_count(pl=pl, segment_info=segment_info), '100') + finally: + vim_module._bw(segment_info['bufnr']) + + def test_position(self): + pl = Pl() + segment_info = vim_module._get_segment_info() + try: + segment_info['buffer'][0:-1] = [str(i) for i in range(99)] + vim_module._set_cursor(49, 0) + self.assertEqual(self.vim.position(pl=pl, segment_info=segment_info), '50%') + self.assertEqual(self.vim.position(pl=pl, segment_info=segment_info, gradient=True), [ + {'contents': '50%', 'highlight_groups': ['position_gradient', 'position'], 'gradient_level': 50.0} + ]) + vim_module._set_cursor(0, 0) + self.assertEqual(self.vim.position(pl=pl, segment_info=segment_info), 'Top') + vim_module._set_cursor(97, 0) + self.assertEqual(self.vim.position(pl=pl, segment_info=segment_info, position_strings={'top': 'Comienzo', 'bottom': 'Final', 'all': 'Todo'}), 'Final') + segment_info['buffer'][0:-1] = [str(i) for i in range(2)] + vim_module._set_cursor(0, 0) + self.assertEqual(self.vim.position(pl=pl, segment_info=segment_info, position_strings={'top': 'Comienzo', 'bottom': 'Final', 'all': 'Todo'}), 'Todo') + self.assertEqual(self.vim.position(pl=pl, segment_info=segment_info, gradient=True), [ + {'contents': 'All', 'highlight_groups': ['position_gradient', 'position'], 'gradient_level': 0.0} + ]) + finally: + vim_module._bw(segment_info['bufnr']) + + def test_cursor_current(self): + pl = Pl() + segment_info = vim_module._get_segment_info() + self.assertEqual(self.vim.line_current(pl=pl, segment_info=segment_info), '1') + self.assertEqual(self.vim.col_current(pl=pl, segment_info=segment_info), '1') + self.assertEqual(self.vim.virtcol_current(pl=pl, segment_info=segment_info), [{ + 'highlight_groups': ['virtcol_current_gradient', 'virtcol_current', 'col_current'], 'contents': '1', 'gradient_level': 100.0 / 80, + }]) + self.assertEqual(self.vim.virtcol_current(pl=pl, segment_info=segment_info, gradient=False), [{ + 'highlight_groups': ['virtcol_current', 'col_current'], 'contents': '1', + }]) + + def test_modified_buffers(self): + pl = Pl() + self.assertEqual(self.vim.modified_buffers(pl=pl), None) + + def test_branch(self): + pl = Pl() + create_watcher = get_fallback_create_watcher() + branch = partial(self.vim.branch, pl=pl, create_watcher=create_watcher) + with vim_module._with('buffer', '/foo') as segment_info: + with replace_attr(self.vcs, 'guess', get_dummy_guess(status=lambda: None)): + with replace_attr(self.vcs, 'tree_status', lambda repo, pl: None): + self.assertEqual(branch(segment_info=segment_info, status_colors=False), [ + {'divider_highlight_group': 'branch:divider', 'highlight_groups': ['branch'], 'contents': 'foo'} + ]) + self.assertEqual(branch(segment_info=segment_info, status_colors=True), [ + {'divider_highlight_group': 'branch:divider', 'highlight_groups': ['branch_clean', 'branch'], 'contents': 'foo'} + ]) + with replace_attr(self.vcs, 'guess', get_dummy_guess(status=lambda: 'DU')): + with replace_attr(self.vcs, 'tree_status', lambda repo, pl: 'DU'): + self.assertEqual(branch(segment_info=segment_info, status_colors=False), [ + {'divider_highlight_group': 'branch:divider', 'highlight_groups': ['branch'], 'contents': 'foo'} + ]) + self.assertEqual(branch(segment_info=segment_info, status_colors=True), [ + {'divider_highlight_group': 'branch:divider', 'highlight_groups': ['branch_dirty', 'branch'], 'contents': 'foo'} + ]) + with replace_attr(self.vcs, 'guess', get_dummy_guess(status=lambda: 'U')): + with replace_attr(self.vcs, 'tree_status', lambda repo, pl: 'U'): + self.assertEqual(branch(segment_info=segment_info, status_colors=False, ignore_statuses=['U']), [ + {'divider_highlight_group': 'branch:divider', 'highlight_groups': ['branch'], 'contents': 'foo'} + ]) + self.assertEqual(branch(segment_info=segment_info, status_colors=True, ignore_statuses=['DU']), [ + {'divider_highlight_group': 'branch:divider', 'highlight_groups': ['branch_dirty', 'branch'], 'contents': 'foo'} + ]) + self.assertEqual(branch(segment_info=segment_info, status_colors=True), [ + {'divider_highlight_group': 'branch:divider', 'highlight_groups': ['branch_dirty', 'branch'], 'contents': 'foo'} + ]) + self.assertEqual(branch(segment_info=segment_info, status_colors=True, ignore_statuses=['U']), [ + {'divider_highlight_group': 'branch:divider', 'highlight_groups': ['branch_clean', 'branch'], 'contents': 'foo'} + ]) + + def test_stash(self): + pl = Pl() + create_watcher = get_fallback_create_watcher() + with vim_module._with('buffer', '/foo') as segment_info: + stash = partial(self.vim.stash, pl=pl, create_watcher=create_watcher, segment_info=segment_info) + + def forge_stash(n): + return replace_attr(self.vcs, 'guess', get_dummy_guess(stash=lambda: n)) + + with forge_stash(0): + self.assertEqual(stash(), None) + with forge_stash(1): + self.assertEqual(stash(), [{ + 'divider_highlight_group': 'stash:divider', + 'highlight_groups': ['stash'], + 'contents': '1' + }]) + with forge_stash(2): + self.assertEqual(stash(), [{ + 'divider_highlight_group': 'stash:divider', + 'highlight_groups': ['stash'], + 'contents': '2' + }]) + + def test_file_vcs_status(self): + pl = Pl() + create_watcher = get_fallback_create_watcher() + file_vcs_status = partial(self.vim.file_vcs_status, pl=pl, create_watcher=create_watcher) + with vim_module._with('buffer', '/foo') as segment_info: + with replace_attr(self.vim, 'guess', get_dummy_guess(status=lambda file: 'M')): + self.assertEqual(file_vcs_status(segment_info=segment_info), [ + {'highlight_groups': ['file_vcs_status_M', 'file_vcs_status'], 'contents': 'M'} + ]) + with replace_attr(self.vim, 'guess', get_dummy_guess(status=lambda file: None)): + self.assertEqual(file_vcs_status(segment_info=segment_info), None) + with vim_module._with('buffer', '/bar') as segment_info: + with vim_module._with('bufoptions', buftype='nofile'): + with replace_attr(self.vim, 'guess', get_dummy_guess(status=lambda file: 'M')): + self.assertEqual(file_vcs_status(segment_info=segment_info), None) + + def test_trailing_whitespace(self): + pl = Pl() + with vim_module._with('buffer', 'tws') as segment_info: + trailing_whitespace = partial(self.vim.trailing_whitespace, pl=pl, segment_info=segment_info) + self.assertEqual(trailing_whitespace(), None) + self.assertEqual(trailing_whitespace(), None) + vim_module.current.buffer[0] = ' ' + self.assertEqual(trailing_whitespace(), [{ + 'highlight_groups': ['trailing_whitespace', 'warning'], + 'contents': '1', + }]) + self.assertEqual(trailing_whitespace(), [{ + 'highlight_groups': ['trailing_whitespace', 'warning'], + 'contents': '1', + }]) + vim_module.current.buffer[0] = '' + self.assertEqual(trailing_whitespace(), None) + self.assertEqual(trailing_whitespace(), None) + + def test_tabnr(self): + pl = Pl() + segment_info = vim_module._get_segment_info() + self.assertEqual(self.vim.tabnr(pl=pl, segment_info=segment_info, show_current=True), '1') + self.assertEqual(self.vim.tabnr(pl=pl, segment_info=segment_info, show_current=False), None) + + def test_tab(self): + pl = Pl() + segment_info = vim_module._get_segment_info() + self.assertEqual(self.vim.tab(pl=pl, segment_info=segment_info), [{ + 'contents': None, + 'literal_contents': (0, '%1T'), + }]) + self.assertEqual(self.vim.tab(pl=pl, segment_info=segment_info, end=True), [{ + 'contents': None, + 'literal_contents': (0, '%T'), + }]) + + def test_bufnr(self): + pl = Pl() + segment_info = vim_module._get_segment_info() + self.assertEqual(self.vim.bufnr(pl=pl, segment_info=segment_info, show_current=True), str(segment_info['bufnr'])) + self.assertEqual(self.vim.bufnr(pl=pl, segment_info=segment_info, show_current=False), None) + + def test_winnr(self): + pl = Pl() + segment_info = vim_module._get_segment_info() + self.assertEqual(self.vim.winnr(pl=pl, segment_info=segment_info, show_current=True), str(segment_info['winnr'])) + self.assertEqual(self.vim.winnr(pl=pl, segment_info=segment_info, show_current=False), None) + + def test_segment_info(self): + pl = Pl() + with vim_module._with('tabpage'): + with vim_module._with('buffer', '1') as segment_info: + self.assertEqual(self.vim.tab_modified_indicator(pl=pl, segment_info=segment_info), None) + vim_module.current.buffer[0] = ' ' + self.assertEqual(self.vim.tab_modified_indicator(pl=pl, segment_info=segment_info), [{ + 'contents': '+', + 'highlight_groups': ['tab_modified_indicator', 'modified_indicator'], + }]) + vim_module._undo() + self.assertEqual(self.vim.tab_modified_indicator(pl=pl, segment_info=segment_info), None) + old_buffer = vim_module.current.buffer + vim_module._new('2') + segment_info = vim_module._get_segment_info() + self.assertEqual(self.vim.tab_modified_indicator(pl=pl, segment_info=segment_info), None) + old_buffer[0] = ' ' + self.assertEqual(self.vim.modified_indicator(pl=pl, segment_info=segment_info), None) + self.assertEqual(self.vim.tab_modified_indicator(pl=pl, segment_info=segment_info), [{ + 'contents': '+', + 'highlight_groups': ['tab_modified_indicator', 'modified_indicator'], + }]) + + def test_csv_col_current(self): + pl = Pl() + segment_info = vim_module._get_segment_info() + + def csv_col_current(**kwargs): + self.vim.csv_cache and self.vim.csv_cache.clear() + return self.vim.csv_col_current(pl=pl, segment_info=segment_info, **kwargs) + + buffer = segment_info['buffer'] + try: + self.assertEqual(csv_col_current(), None) + buffer.options['filetype'] = 'csv' + self.assertEqual(csv_col_current(), None) + buffer[:] = ['1;2;3', '4;5;6'] + vim_module._set_cursor(1, 1) + self.assertEqual(csv_col_current(), [{ + 'contents': '1', 'highlight_groups': ['csv:column_number', 'csv'] + }]) + vim_module._set_cursor(2, 3) + self.assertEqual(csv_col_current(), [{ + 'contents': '2', 'highlight_groups': ['csv:column_number', 'csv'] + }]) + vim_module._set_cursor(2, 3) + self.assertEqual(csv_col_current(display_name=True), [{ + 'contents': '2', 'highlight_groups': ['csv:column_number', 'csv'] + }, { + 'contents': ' (2)', 'highlight_groups': ['csv:column_name', 'csv'] + }]) + buffer[:0] = ['Foo;Bar;Baz'] + vim_module._set_cursor(2, 3) + self.assertEqual(csv_col_current(), [{ + 'contents': '2', 'highlight_groups': ['csv:column_number', 'csv'] + }, { + 'contents': ' (Bar)', 'highlight_groups': ['csv:column_name', 'csv'] + }]) + if sys.version_info < (2, 7): + raise SkipTest('csv module in Python-2.6 does not handle multiline csv files well') + buffer[len(buffer):] = ['1;"bc', 'def', 'ghi', 'jkl";3'] + vim_module._set_cursor(5, 1) + self.assertEqual(csv_col_current(), [{ + 'contents': '2', 'highlight_groups': ['csv:column_number', 'csv'] + }, { + 'contents': ' (Bar)', 'highlight_groups': ['csv:column_name', 'csv'] + }]) + vim_module._set_cursor(7, 6) + self.assertEqual(csv_col_current(), [{ + 'contents': '3', 'highlight_groups': ['csv:column_number', 'csv'] + }, { + 'contents': ' (Baz)', 'highlight_groups': ['csv:column_name', 'csv'] + }]) + self.assertEqual(csv_col_current(name_format=' ({column_name:.1})'), [{ + 'contents': '3', 'highlight_groups': ['csv:column_number', 'csv'] + }, { + 'contents': ' (B)', 'highlight_groups': ['csv:column_name', 'csv'] + }]) + self.assertEqual(csv_col_current(display_name=True, name_format=' ({column_name:.1})'), [{ + 'contents': '3', 'highlight_groups': ['csv:column_number', 'csv'] + }, { + 'contents': ' (B)', 'highlight_groups': ['csv:column_name', 'csv'] + }]) + self.assertEqual(csv_col_current(display_name=False, name_format=' ({column_name:.1})'), [{ + 'contents': '3', 'highlight_groups': ['csv:column_number', 'csv'] + }]) + self.assertEqual(csv_col_current(display_name=False), [{ + 'contents': '3', 'highlight_groups': ['csv:column_number', 'csv'] + }]) + finally: + vim_module._bw(segment_info['bufnr']) + + @classmethod + def setUpClass(cls): + sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(os.path.dirname(__file__)), 'vim_sys_path'))) + from powerline.segments import vim + cls.vim = vim + from powerline.segments.common import vcs + cls.vcs = vcs + + @classmethod + def tearDownClass(cls): + sys.path.pop(0) + + +class TestPDB(TestCase): + def test_current_line(self): + pl = Pl() + self.assertEqual(pdb.current_line(pl=pl, segment_info={'curframe': Args(f_lineno=10)}), '10') + + def test_current_file(self): + pl = Pl() + cf = lambda **kwargs: pdb.current_file( + pl=pl, + segment_info={'curframe': Args(f_code=Args(co_filename='/tmp/abc.py'))}, + **kwargs + ) + self.assertEqual(cf(), 'abc.py') + self.assertEqual(cf(basename=True), 'abc.py') + self.assertEqual(cf(basename=False), '/tmp/abc.py') + + def test_current_code_name(self): + pl = Pl() + ccn = lambda **kwargs: pdb.current_code_name( + pl=pl, + segment_info={'curframe': Args(f_code=Args(co_name='<module>'))}, + **kwargs + ) + self.assertEqual(ccn(), '<module>') + + def test_current_context(self): + pl = Pl() + cc = lambda **kwargs: pdb.current_context( + pl=pl, + segment_info={'curframe': Args(f_code=Args(co_name='<module>', co_filename='/tmp/abc.py'))}, + **kwargs + ) + self.assertEqual(cc(), 'abc.py') + + def test_stack_depth(self): + pl = Pl() + sd = lambda **kwargs: pdb.stack_depth( + pl=pl, + segment_info={'pdb': Args(stack=[1, 2, 3]), 'initial_stack_length': 1}, + **kwargs + ) + self.assertEqual(sd(), '2') + self.assertEqual(sd(full_stack=False), '2') + self.assertEqual(sd(full_stack=True), '3') + + +old_cwd = None + + +def setUpModule(): + global old_cwd + global __file__ + old_cwd = os.getcwd() + __file__ = os.path.abspath(__file__) + os.chdir(os.path.dirname(os.path.dirname(__file__))) + + +def tearDownModule(): + global old_cwd + os.chdir(old_cwd) + + +if __name__ == '__main__': + from tests.modules import main + main() diff --git a/tests/test_python/test_selectors.py b/tests/test_python/test_selectors.py new file mode 100644 index 0000000..74ace8d --- /dev/null +++ b/tests/test_python/test_selectors.py @@ -0,0 +1,36 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import os +import sys + +from functools import partial + +import tests.modules.vim as vim_module + +from tests.modules.lib import Pl +from tests.modules import TestCase + + +class TestVim(TestCase): + def test_single_tab(self): + pl = Pl() + single_tab = partial(self.vim.single_tab, pl=pl, segment_info=None, mode=None) + with vim_module._with('tabpage'): + self.assertEqual(single_tab(), False) + self.assertEqual(single_tab(), True) + + @classmethod + def setUpClass(cls): + sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(os.path.dirname(__file__)), 'vim_sys_path'))) + from powerline.selectors import vim + cls.vim = vim + + @classmethod + def tearDownClass(cls): + sys.path.pop(0) + + +if __name__ == '__main__': + from tests.modules import main + main() diff --git a/tests/test_python/test_watcher.py b/tests/test_python/test_watcher.py new file mode 100644 index 0000000..a246d0b --- /dev/null +++ b/tests/test_python/test_watcher.py @@ -0,0 +1,245 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import shutil +import os + +from time import sleep +from functools import partial +from errno import ENOENT + +from powerline.lib.watcher import create_file_watcher, create_tree_watcher, INotifyError +from powerline.lib.watcher.uv import UvNotFound +from powerline import get_fallback_logger +from powerline.lib.monotonic import monotonic + +from tests.modules import TestCase, SkipTest + + +INOTIFY_DIR = 'inotify' + os.path.basename(os.environ.get('PYTHON', '')) + + +def clear_dir(dir): + for root, dirs, files in list(os.walk(dir, topdown=False)): + for f in files: + os.remove(os.path.join(root, f)) + for d in dirs: + os.rmdir(os.path.join(root, d)) + + +def set_watcher_tests(l): + byte_tests = (('bytes', True), ('unicode', False)) + + for btn, use_bytes in byte_tests: + def test_inotify_file_watcher(self, use_bytes=use_bytes): + try: + w = create_file_watcher(pl=get_fallback_logger(), watcher_type='inotify') + except INotifyError: + raise SkipTest('This test is not suitable for a stat based file watcher') + self.do_test_file_watcher(w, use_bytes) + + def test_uv_file_watcher(self, use_bytes=use_bytes): + raise SkipTest('Uv watcher tests are not stable') + try: + w = create_file_watcher(pl=get_fallback_logger(), watcher_type='uv') + except UvNotFound: + raise SkipTest('Pyuv is not available') + self.do_test_file_watcher(w, use_bytes) + + def test_inotify_tree_watcher(self, use_bytes=use_bytes): + try: + tw = create_tree_watcher(get_fallback_logger(), watcher_type='inotify') + except INotifyError: + raise SkipTest('INotify is not available') + self.do_test_tree_watcher(tw, use_bytes) + + def test_uv_tree_watcher(self, use_bytes=use_bytes): + raise SkipTest('Uv watcher tests are not stable') + try: + tw = create_tree_watcher(get_fallback_logger(), 'uv') + except UvNotFound: + raise SkipTest('Pyuv is not available') + self.do_test_tree_watcher(tw, use_bytes) + + def test_inotify_file_watcher_is_watching(self, use_bytes=use_bytes): + try: + w = create_file_watcher(pl=get_fallback_logger(), watcher_type='inotify') + except INotifyError: + raise SkipTest('INotify is not available') + self.do_test_file_watcher_is_watching(w, use_bytes) + + def test_stat_file_watcher_is_watching(self, use_bytes=use_bytes): + w = create_file_watcher(pl=get_fallback_logger(), watcher_type='stat') + self.do_test_file_watcher_is_watching(w, use_bytes) + + def test_uv_file_watcher_is_watching(self, use_bytes=use_bytes): + try: + w = create_file_watcher(pl=get_fallback_logger(), watcher_type='uv') + except UvNotFound: + raise SkipTest('Pyuv is not available') + self.do_test_file_watcher_is_watching(w, use_bytes) + + for wt in ('uv', 'inotify'): + l['test_{0}_file_watcher_{1}'.format(wt, btn)] = locals()['test_{0}_file_watcher'.format(wt)] + l['test_{0}_tree_watcher_{1}'.format(wt, btn)] = locals()['test_{0}_tree_watcher'.format(wt)] + l['test_{0}_file_watcher_is_watching_{1}'.format(wt, btn)] = ( + locals()['test_{0}_file_watcher_is_watching'.format(wt)]) + l['test_{0}_file_watcher_is_watching_{1}'.format('stat', btn)] = ( + locals()['test_{0}_file_watcher_is_watching'.format('stat')]) + + +class TestFilesystemWatchers(TestCase): + def do_test_for_change(self, watcher, path): + st = monotonic() + while monotonic() - st < 1: + if watcher(path): + return + sleep(0.1) + self.fail('The change to {0} was not detected'.format(path)) + + def do_test_file_watcher(self, w, use_bytes=False): + try: + f1, f2, f3 = map(lambda x: os.path.join(INOTIFY_DIR, 'file%d' % x), (1, 2, 3)) + ne = os.path.join(INOTIFY_DIR, 'notexists') + if use_bytes: + f1 = f1.encode('utf-8') + f2 = f2.encode('utf-8') + f3 = f3.encode('utf-8') + ne = ne.encode('utf-8') + with open(f1, 'wb'): + with open(f2, 'wb'): + with open(f3, 'wb'): + pass + self.assertRaises(OSError, w, ne) + self.assertTrue(w(f1)) + self.assertTrue(w(f2)) + os.utime(f1, None), os.utime(f2, None) + self.do_test_for_change(w, f1) + self.do_test_for_change(w, f2) + # Repeat once + os.utime(f1, None), os.utime(f2, None) + self.do_test_for_change(w, f1) + self.do_test_for_change(w, f2) + # Check that no false changes are reported + self.assertFalse(w(f1), 'Spurious change detected') + self.assertFalse(w(f2), 'Spurious change detected') + # Check that open the file with 'w' triggers a change + with open(f1, 'wb'): + with open(f2, 'wb'): + pass + self.do_test_for_change(w, f1) + self.do_test_for_change(w, f2) + # Check that writing to a file with 'a' triggers a change + with open(f1, 'ab') as f: + f.write(b'1') + self.do_test_for_change(w, f1) + # Check that deleting a file registers as a change + os.unlink(f1) + self.do_test_for_change(w, f1) + # Test that changing the inode of a file does not cause it to stop + # being watched + os.rename(f3, f2) + self.do_test_for_change(w, f2) + self.assertFalse(w(f2), 'Spurious change detected') + os.utime(f2, None) + self.do_test_for_change(w, f2) + finally: + clear_dir(INOTIFY_DIR) + + def do_test_tree_watcher(self, tw, use_bytes=False): + try: + inotify_dir = INOTIFY_DIR + subdir = os.path.join(inotify_dir, 'subdir') + t1 = os.path.join(inotify_dir, 'tree1') + ts1 = os.path.join(subdir, 'tree1') + suffix = '1' + f = os.path.join(subdir, 'f') + if use_bytes: + inotify_dir = inotify_dir.encode('utf-8') + subdir = subdir.encode('utf-8') + t1 = t1.encode('utf-8') + ts1 = ts1.encode('utf-8') + suffix = suffix.encode('utf-8') + f = f.encode('utf-8') + os.mkdir(subdir) + try: + if tw.watch(inotify_dir).is_dummy: + raise SkipTest('No tree watcher available') + except UvNotFound: + raise SkipTest('Pyuv is not available') + except INotifyError: + raise SkipTest('INotify is not available') + self.assertTrue(tw(inotify_dir)) + self.assertFalse(tw(inotify_dir)) + changed = partial(self.do_test_for_change, tw, inotify_dir) + open(t1, 'w').close() + changed() + open(ts1, 'w').close() + changed() + os.unlink(ts1) + changed() + os.rmdir(subdir) + changed() + os.mkdir(subdir) + changed() + os.rename(subdir, subdir + suffix) + changed() + shutil.rmtree(subdir + suffix) + changed() + os.mkdir(subdir) + open(f, 'w').close() + changed() + with open(f, 'a') as s: + s.write(' ') + changed() + os.rename(f, f + suffix) + changed() + finally: + clear_dir(inotify_dir) + + def do_test_file_watcher_is_watching(self, w, use_bytes=False): + try: + f1, f2, f3 = map(lambda x: os.path.join(INOTIFY_DIR, 'file%d' % x), (1, 2, 3)) + ne = os.path.join(INOTIFY_DIR, 'notexists') + if use_bytes: + f1 = f1.encode('utf-8') + f2 = f2.encode('utf-8') + f3 = f3.encode('utf-8') + ne = ne.encode('utf-8') + with open(f1, 'wb'): + with open(f2, 'wb'): + with open(f3, 'wb'): + pass + self.assertRaises(OSError, w, ne) + try: + w(ne) + except OSError as e: + self.assertEqual(e.errno, ENOENT) + self.assertTrue(w(f1)) + self.assertFalse(w.is_watching(ne)) + self.assertTrue(w.is_watching(f1)) + self.assertFalse(w.is_watching(f2)) + finally: + clear_dir(INOTIFY_DIR) + + set_watcher_tests(locals()) + + +old_cwd = None + + +def setUpModule(): + global old_cwd + old_cwd = os.getcwd() + os.chdir(os.path.dirname(os.path.dirname(__file__))) + os.mkdir(INOTIFY_DIR) + + +def tearDownModule(): + shutil.rmtree(INOTIFY_DIR) + os.chdir(old_cwd) + + +if __name__ == '__main__': + from tests.modules import main + main() |