summaryrefslogtreecommitdiffstats
path: root/powerline/lint
diff options
context:
space:
mode:
Diffstat (limited to 'powerline/lint')
-rw-r--r--powerline/lint/__init__.py625
-rw-r--r--powerline/lint/checks.py866
-rw-r--r--powerline/lint/context.py68
-rw-r--r--powerline/lint/imp.py56
-rw-r--r--powerline/lint/inspect.py63
-rw-r--r--powerline/lint/markedjson/__init__.py19
-rw-r--r--powerline/lint/markedjson/composer.py119
-rw-r--r--powerline/lint/markedjson/constructor.py285
-rw-r--r--powerline/lint/markedjson/error.py241
-rw-r--r--powerline/lint/markedjson/events.py97
-rw-r--r--powerline/lint/markedjson/loader.py25
-rw-r--r--powerline/lint/markedjson/markedvalue.py151
-rw-r--r--powerline/lint/markedjson/nodes.py55
-rw-r--r--powerline/lint/markedjson/parser.py255
-rw-r--r--powerline/lint/markedjson/reader.py141
-rw-r--r--powerline/lint/markedjson/resolver.py131
-rw-r--r--powerline/lint/markedjson/scanner.py499
-rw-r--r--powerline/lint/markedjson/tokens.py72
-rw-r--r--powerline/lint/selfcheck.py16
-rw-r--r--powerline/lint/spec.py759
20 files changed, 4543 insertions, 0 deletions
diff --git a/powerline/lint/__init__.py b/powerline/lint/__init__.py
new file mode 100644
index 0000000..8c68271
--- /dev/null
+++ b/powerline/lint/__init__.py
@@ -0,0 +1,625 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import os
+import logging
+
+from collections import defaultdict
+from itertools import chain
+from functools import partial
+
+from powerline import generate_config_finder, get_config_paths, load_config
+from powerline.segments.vim import vim_modes
+from powerline.lib.dict import mergedicts_copy
+from powerline.lib.config import ConfigLoader
+from powerline.lib.unicode import unicode
+from powerline.lib.path import join
+from powerline.lint.markedjson import load
+from powerline.lint.markedjson.error import echoerr, EchoErr, MarkedError
+from powerline.lint.checks import (check_matcher_func, check_ext, check_config, check_top_theme,
+ check_color, check_translated_group_name, check_group,
+ check_segment_module, check_exinclude_function, type_keys,
+ check_segment_function, check_args, get_one_segment_function,
+ check_highlight_groups, check_highlight_group, check_full_segment_data,
+ get_all_possible_functions, check_segment_data_key, register_common_name,
+ highlight_group_spec, check_log_file_level, check_logging_handler)
+from powerline.lint.spec import Spec
+from powerline.lint.context import Context
+
+
+def open_file(path):
+ return open(path, 'rb')
+
+
+def generate_json_config_loader(lhadproblem):
+ def load_json_config(config_file_path, load=load, open_file=open_file):
+ with open_file(config_file_path) as config_file_fp:
+ r, hadproblem = load(config_file_fp)
+ if hadproblem:
+ lhadproblem[0] = True
+ return r
+ return load_json_config
+
+
+function_name_re = '^(\w+\.)*[a-zA-Z_]\w*$'
+
+
+divider_spec = Spec().printable().len(
+ 'le', 3, (lambda value: 'Divider {0!r} is too large!'.format(value))).copy
+ext_theme_spec = Spec().type(unicode).func(lambda *args: check_config('themes', *args)).copy
+top_theme_spec = Spec().type(unicode).func(check_top_theme).copy
+ext_spec = Spec(
+ colorscheme=Spec().type(unicode).func(
+ (lambda *args: check_config('colorschemes', *args))
+ ),
+ theme=ext_theme_spec(),
+ top_theme=top_theme_spec().optional(),
+).copy
+gen_components_spec = (lambda *components: Spec().list(Spec().type(unicode).oneof(set(components))))
+log_level_spec = Spec().re('^[A-Z]+$').func(
+ (lambda value, *args: (True, True, not hasattr(logging, value))),
+ (lambda value: 'unknown debugging level {0}'.format(value))
+).copy
+log_format_spec = Spec().type(unicode).copy
+main_spec = (Spec(
+ common=Spec(
+ default_top_theme=top_theme_spec().optional(),
+ term_truecolor=Spec().type(bool).optional(),
+ term_escape_style=Spec().type(unicode).oneof(set(('auto', 'xterm', 'fbterm'))).optional(),
+ # Python is capable of loading from zip archives. Thus checking path
+ # only for existence of the path, not for it being a directory
+ paths=Spec().list(
+ (lambda value, *args: (True, True, not os.path.exists(os.path.expanduser(value.value)))),
+ (lambda value: 'path does not exist: {0}'.format(value))
+ ).optional(),
+ log_file=Spec().either(
+ Spec().type(unicode).func(
+ (
+ lambda value, *args: (
+ True,
+ True,
+ not os.path.isdir(os.path.dirname(os.path.expanduser(value)))
+ )
+ ),
+ (lambda value: 'directory does not exist: {0}'.format(os.path.dirname(value)))
+ ),
+ Spec().list(Spec().either(
+ Spec().type(unicode, type(None)),
+ Spec().tuple(
+ Spec().re(function_name_re).func(check_logging_handler),
+ Spec().tuple(
+ Spec().type(list).optional(),
+ Spec().type(dict).optional(),
+ ),
+ log_level_spec().func(check_log_file_level).optional(),
+ log_format_spec().optional(),
+ ),
+ ))
+ ).optional(),
+ log_level=log_level_spec().optional(),
+ log_format=log_format_spec().optional(),
+ interval=Spec().either(Spec().cmp('gt', 0.0), Spec().type(type(None))).optional(),
+ reload_config=Spec().type(bool).optional(),
+ watcher=Spec().type(unicode).oneof(set(('auto', 'inotify', 'stat'))).optional(),
+ ).context_message('Error while loading common configuration (key {key})'),
+ ext=Spec(
+ vim=ext_spec().update(
+ components=gen_components_spec('statusline', 'tabline').optional(),
+ local_themes=Spec(
+ __tabline__=ext_theme_spec(),
+ ).unknown_spec(
+ Spec().re(function_name_re).func(partial(check_matcher_func, 'vim')),
+ ext_theme_spec()
+ ),
+ ).optional(),
+ ipython=ext_spec().update(
+ local_themes=Spec(
+ in2=ext_theme_spec(),
+ out=ext_theme_spec(),
+ rewrite=ext_theme_spec(),
+ ),
+ ).optional(),
+ shell=ext_spec().update(
+ components=gen_components_spec('tmux', 'prompt').optional(),
+ local_themes=Spec(
+ continuation=ext_theme_spec(),
+ select=ext_theme_spec(),
+ ),
+ ).optional(),
+ wm=ext_spec().update(
+ local_themes=Spec().unknown_spec(
+ Spec().re('^[0-9A-Za-z-]+$'),
+ ext_theme_spec()
+ ).optional(),
+ update_interval=Spec().cmp('gt', 0.0).optional(),
+ ).optional(),
+ ).unknown_spec(
+ check_ext,
+ ext_spec(),
+ ).context_message('Error while loading extensions configuration (key {key})'),
+).context_message('Error while loading main configuration'))
+
+term_color_spec = Spec().unsigned().cmp('le', 255).copy
+true_color_spec = Spec().re(
+ '^[0-9a-fA-F]{6}$',
+ (lambda value: '"{0}" is not a six-digit hexadecimal unsigned integer written as a string'.format(value))
+).copy
+colors_spec = (Spec(
+ colors=Spec().unknown_spec(
+ Spec().ident(),
+ Spec().either(
+ Spec().tuple(term_color_spec(), true_color_spec()),
+ term_color_spec()
+ )
+ ).context_message('Error while checking colors (key {key})'),
+ gradients=Spec().unknown_spec(
+ Spec().ident(),
+ Spec().tuple(
+ Spec().len('gt', 1).list(term_color_spec()),
+ Spec().len('gt', 1).list(true_color_spec()).optional(),
+ )
+ ).context_message('Error while checking gradients (key {key})'),
+).context_message('Error while loading colors configuration'))
+
+
+color_spec = Spec().type(unicode).func(check_color).copy
+name_spec = Spec().type(unicode).len('gt', 0).optional().copy
+group_name_spec = Spec().ident().copy
+group_spec = Spec().either(Spec(
+ fg=color_spec(),
+ bg=color_spec(),
+ attrs=Spec().list(Spec().type(unicode).oneof(set(('bold', 'italic', 'underline')))),
+), group_name_spec().func(check_group)).copy
+groups_spec = Spec().unknown_spec(
+ group_name_spec(),
+ group_spec(),
+).context_message('Error while loading groups (key {key})').copy
+colorscheme_spec = (Spec(
+ name=name_spec(),
+ groups=groups_spec(),
+).context_message('Error while loading coloscheme'))
+mode_translations_value_spec = Spec(
+ colors=Spec().unknown_spec(
+ color_spec(),
+ color_spec(),
+ ).optional(),
+ groups=Spec().unknown_spec(
+ group_name_spec().func(check_translated_group_name),
+ group_spec(),
+ ).optional(),
+).copy
+top_colorscheme_spec = (Spec(
+ name=name_spec(),
+ groups=groups_spec(),
+ mode_translations=Spec().unknown_spec(
+ Spec().type(unicode),
+ mode_translations_value_spec(),
+ ).optional().context_message('Error while loading mode translations (key {key})').optional(),
+).context_message('Error while loading top-level coloscheme'))
+vim_mode_spec = Spec().oneof(set(list(vim_modes) + ['nc', 'tab_nc', 'buf_nc'])).copy
+vim_colorscheme_spec = (Spec(
+ name=name_spec(),
+ groups=groups_spec(),
+ mode_translations=Spec().unknown_spec(
+ vim_mode_spec(),
+ mode_translations_value_spec(),
+ ).optional().context_message('Error while loading mode translations (key {key})'),
+).context_message('Error while loading vim colorscheme'))
+shell_mode_spec = Spec().re('^(?:[\w\-]+|\.safe)$').copy
+shell_colorscheme_spec = (Spec(
+ name=name_spec(),
+ groups=groups_spec(),
+ mode_translations=Spec().unknown_spec(
+ shell_mode_spec(),
+ mode_translations_value_spec(),
+ ).optional().context_message('Error while loading mode translations (key {key})'),
+).context_message('Error while loading shell colorscheme'))
+
+
+args_spec = Spec(
+ pl=Spec().error('pl object must be set by powerline').optional(),
+ segment_info=Spec().error('Segment info dictionary must be set by powerline').optional(),
+).unknown_spec(Spec(), Spec()).optional().copy
+segment_module_spec = Spec().type(unicode).func(check_segment_module).optional().copy
+exinclude_spec = Spec().re(function_name_re).func(check_exinclude_function).copy
+segment_spec_base = Spec(
+ name=Spec().re('^[a-zA-Z_]\w*$').optional(),
+ function=Spec().re(function_name_re).func(check_segment_function).optional(),
+ exclude_modes=Spec().list(vim_mode_spec()).optional(),
+ include_modes=Spec().list(vim_mode_spec()).optional(),
+ exclude_function=exinclude_spec().optional(),
+ include_function=exinclude_spec().optional(),
+ draw_hard_divider=Spec().type(bool).optional(),
+ draw_soft_divider=Spec().type(bool).optional(),
+ draw_inner_divider=Spec().type(bool).optional(),
+ display=Spec().type(bool).optional(),
+ module=segment_module_spec(),
+ priority=Spec().type(int, float, type(None)).optional(),
+ after=Spec().printable().optional(),
+ before=Spec().printable().optional(),
+ width=Spec().either(Spec().unsigned(), Spec().cmp('eq', 'auto')).optional(),
+ align=Spec().oneof(set('lr')).optional(),
+ args=args_spec().func(lambda *args, **kwargs: check_args(get_one_segment_function, *args, **kwargs)),
+ contents=Spec().printable().optional(),
+ highlight_groups=Spec().list(
+ highlight_group_spec().re(
+ '^(?:(?!:divider$).)+$',
+ (lambda value: 'it is recommended that only divider highlight group names end with ":divider"')
+ )
+ ).func(check_highlight_groups).optional(),
+ divider_highlight_group=highlight_group_spec().func(check_highlight_group).re(
+ ':divider$',
+ (lambda value: 'it is recommended that divider highlight group names end with ":divider"')
+ ).optional(),
+).func(check_full_segment_data).copy
+subsegment_spec = segment_spec_base().update(
+ type=Spec().oneof(set((key for key in type_keys if key != 'segment_list'))).optional(),
+)
+segment_spec = segment_spec_base().update(
+ type=Spec().oneof(type_keys).optional(),
+ segments=Spec().optional().list(subsegment_spec),
+)
+segments_spec = Spec().optional().list(segment_spec).copy
+segdict_spec = Spec(
+ left=segments_spec().context_message('Error while loading segments from left side (key {key})'),
+ right=segments_spec().context_message('Error while loading segments from right side (key {key})'),
+).func(
+ (lambda value, *args: (True, True, not (('left' in value) or ('right' in value)))),
+ (lambda value: 'segments dictionary must contain either left, right or both keys')
+).context_message('Error while loading segments (key {key})').copy
+divside_spec = Spec(
+ hard=divider_spec(),
+ soft=divider_spec(),
+).copy
+segment_data_value_spec = Spec(
+ after=Spec().printable().optional(),
+ before=Spec().printable().optional(),
+ display=Spec().type(bool).optional(),
+ args=args_spec().func(lambda *args, **kwargs: check_args(get_all_possible_functions, *args, **kwargs)),
+ contents=Spec().printable().optional(),
+).copy
+dividers_spec = Spec(
+ left=divside_spec(),
+ right=divside_spec(),
+).copy
+spaces_spec = Spec().unsigned().cmp(
+ 'le', 2, (lambda value: 'Are you sure you need such a big ({0}) number of spaces?'.format(value))
+).copy
+common_theme_spec = Spec(
+ default_module=segment_module_spec().optional(),
+ cursor_space=Spec().type(int, float).cmp('le', 100).cmp('gt', 0).optional(),
+ cursor_columns=Spec().type(int).cmp('gt', 0).optional(),
+).context_message('Error while loading theme').copy
+top_theme_spec = common_theme_spec().update(
+ dividers=dividers_spec(),
+ spaces=spaces_spec(),
+ use_non_breaking_spaces=Spec().type(bool).optional(),
+ segment_data=Spec().unknown_spec(
+ Spec().func(check_segment_data_key),
+ segment_data_value_spec(),
+ ).optional().context_message('Error while loading segment data (key {key})'),
+)
+main_theme_spec = common_theme_spec().update(
+ dividers=dividers_spec().optional(),
+ spaces=spaces_spec().optional(),
+ segment_data=Spec().unknown_spec(
+ Spec().func(check_segment_data_key),
+ segment_data_value_spec(),
+ ).optional().context_message('Error while loading segment data (key {key})'),
+)
+theme_spec = common_theme_spec().update(
+ dividers=dividers_spec().optional(),
+ spaces=spaces_spec().optional(),
+ segment_data=Spec().unknown_spec(
+ Spec().func(check_segment_data_key),
+ segment_data_value_spec(),
+ ).optional().context_message('Error while loading segment data (key {key})'),
+ segments=segdict_spec().update(above=Spec().list(segdict_spec()).optional()),
+)
+
+
+def register_common_names():
+ register_common_name('player', 'powerline.segments.common.players', '_player')
+
+
+def load_json_file(path):
+ with open_file(path) as F:
+ try:
+ config, hadproblem = load(F)
+ except MarkedError as e:
+ return True, None, str(e)
+ else:
+ return hadproblem, config, None
+
+
+def updated_with_config(d):
+ hadproblem, config, error = load_json_file(d['path'])
+ d.update(
+ hadproblem=hadproblem,
+ config=config,
+ error=error,
+ )
+ return d
+
+
+def find_all_ext_config_files(search_paths, subdir):
+ for config_root in search_paths:
+ top_config_subpath = join(config_root, subdir)
+ if not os.path.isdir(top_config_subpath):
+ if os.path.exists(top_config_subpath):
+ yield {
+ 'error': 'Path {0} is not a directory'.format(top_config_subpath),
+ 'path': top_config_subpath,
+ }
+ continue
+ for ext_name in os.listdir(top_config_subpath):
+ ext_path = os.path.join(top_config_subpath, ext_name)
+ if not os.path.isdir(ext_path):
+ if ext_name.endswith('.json') and os.path.isfile(ext_path):
+ yield updated_with_config({
+ 'error': False,
+ 'path': ext_path,
+ 'name': ext_name[:-5],
+ 'ext': None,
+ 'type': 'top_' + subdir,
+ })
+ else:
+ yield {
+ 'error': 'Path {0} is not a directory or configuration file'.format(ext_path),
+ 'path': ext_path,
+ }
+ continue
+ for config_file_name in os.listdir(ext_path):
+ config_file_path = os.path.join(ext_path, config_file_name)
+ if config_file_name.endswith('.json') and os.path.isfile(config_file_path):
+ yield updated_with_config({
+ 'error': False,
+ 'path': config_file_path,
+ 'name': config_file_name[:-5],
+ 'ext': ext_name,
+ 'type': subdir,
+ })
+ else:
+ yield {
+ 'error': 'Path {0} is not a configuration file'.format(config_file_path),
+ 'path': config_file_path,
+ }
+
+
+def dict2(d):
+ return defaultdict(dict, ((k, dict(v)) for k, v in d.items()))
+
+
+def check(paths=None, debug=False, echoerr=echoerr, require_ext=None):
+ '''Check configuration sanity
+
+ :param list paths:
+ Paths from which configuration should be loaded.
+ :param bool debug:
+ Determines whether some information useful for debugging linter should
+ be output.
+ :param function echoerr:
+ Function that will be used to echo the error(s). Should accept four
+ optional keyword parameters: ``problem`` and ``problem_mark``, and
+ ``context`` and ``context_mark``.
+ :param str require_ext:
+ Require configuration for some extension to be present.
+
+ :return:
+ ``False`` if user configuration seems to be completely sane and ``True``
+ if some problems were found.
+ '''
+ hadproblem = False
+
+ register_common_names()
+ search_paths = paths or get_config_paths()
+ find_config_files = generate_config_finder(lambda: search_paths)
+
+ logger = logging.getLogger('powerline-lint')
+ logger.setLevel(logging.DEBUG if debug else logging.ERROR)
+ logger.addHandler(logging.StreamHandler())
+
+ ee = EchoErr(echoerr, logger)
+
+ if require_ext:
+ used_main_spec = main_spec.copy()
+ try:
+ used_main_spec['ext'][require_ext].required()
+ except KeyError:
+ used_main_spec['ext'][require_ext] = ext_spec()
+ else:
+ used_main_spec = main_spec
+
+ lhadproblem = [False]
+ load_json_config = generate_json_config_loader(lhadproblem)
+
+ config_loader = ConfigLoader(run_once=True, load=load_json_config)
+
+ lists = {
+ 'colorschemes': set(),
+ 'themes': set(),
+ 'exts': set(),
+ }
+ found_dir = {
+ 'themes': False,
+ 'colorschemes': False,
+ }
+ config_paths = defaultdict(lambda: defaultdict(dict))
+ loaded_configs = defaultdict(lambda: defaultdict(dict))
+ for d in chain(
+ find_all_ext_config_files(search_paths, 'colorschemes'),
+ find_all_ext_config_files(search_paths, 'themes'),
+ ):
+ if d['error']:
+ hadproblem = True
+ ee(problem=d['error'])
+ continue
+ if d['hadproblem']:
+ hadproblem = True
+ if d['ext']:
+ found_dir[d['type']] = True
+ lists['exts'].add(d['ext'])
+ if d['name'] == '__main__':
+ pass
+ elif d['name'].startswith('__') or d['name'].endswith('__'):
+ hadproblem = True
+ ee(problem='File name is not supposed to start or end with “__”: {0}'.format(
+ d['path']))
+ else:
+ lists[d['type']].add(d['name'])
+ config_paths[d['type']][d['ext']][d['name']] = d['path']
+ loaded_configs[d['type']][d['ext']][d['name']] = d['config']
+ else:
+ config_paths[d['type']][d['name']] = d['path']
+ loaded_configs[d['type']][d['name']] = d['config']
+
+ for typ in ('themes', 'colorschemes'):
+ if not found_dir[typ]:
+ hadproblem = True
+ ee(problem='Subdirectory {0} was not found in paths {1}'.format(typ, ', '.join(search_paths)))
+
+ diff = set(config_paths['colorschemes']) - set(config_paths['themes'])
+ if diff:
+ hadproblem = True
+ for ext in diff:
+ typ = 'colorschemes' if ext in config_paths['themes'] else 'themes'
+ if not config_paths['top_' + typ] or typ == 'themes':
+ ee(problem='{0} extension {1} not present in {2}'.format(
+ ext,
+ 'configuration' if (
+ ext in loaded_configs['themes'] and ext in loaded_configs['colorschemes']
+ ) else 'directory',
+ typ,
+ ))
+
+ try:
+ main_config = load_config('config', find_config_files, config_loader)
+ except IOError:
+ main_config = {}
+ ee(problem='Configuration file not found: config.json')
+ hadproblem = True
+ except MarkedError as e:
+ main_config = {}
+ ee(problem=str(e))
+ hadproblem = True
+ else:
+ if used_main_spec.match(
+ main_config,
+ data={'configs': config_paths, 'lists': lists},
+ context=Context(main_config),
+ echoerr=ee
+ )[1]:
+ hadproblem = True
+
+ import_paths = [os.path.expanduser(path) for path in main_config.get('common', {}).get('paths', [])]
+
+ try:
+ colors_config = load_config('colors', find_config_files, config_loader)
+ except IOError:
+ colors_config = {}
+ ee(problem='Configuration file not found: colors.json')
+ hadproblem = True
+ except MarkedError as e:
+ colors_config = {}
+ ee(problem=str(e))
+ hadproblem = True
+ else:
+ if colors_spec.match(colors_config, context=Context(colors_config), echoerr=ee)[1]:
+ hadproblem = True
+
+ if lhadproblem[0]:
+ hadproblem = True
+
+ top_colorscheme_configs = dict(loaded_configs['top_colorschemes'])
+ data = {
+ 'ext': None,
+ 'top_colorscheme_configs': top_colorscheme_configs,
+ 'ext_colorscheme_configs': {},
+ 'colors_config': colors_config
+ }
+ for colorscheme, config in loaded_configs['top_colorschemes'].items():
+ data['colorscheme'] = colorscheme
+ if top_colorscheme_spec.match(config, context=Context(config), data=data, echoerr=ee)[1]:
+ hadproblem = True
+
+ ext_colorscheme_configs = dict2(loaded_configs['colorschemes'])
+ for ext, econfigs in ext_colorscheme_configs.items():
+ data = {
+ 'ext': ext,
+ 'top_colorscheme_configs': top_colorscheme_configs,
+ 'ext_colorscheme_configs': ext_colorscheme_configs,
+ 'colors_config': colors_config,
+ }
+ for colorscheme, config in econfigs.items():
+ data['colorscheme'] = colorscheme
+ if ext == 'vim':
+ spec = vim_colorscheme_spec
+ elif ext == 'shell':
+ spec = shell_colorscheme_spec
+ else:
+ spec = colorscheme_spec
+ if spec.match(config, context=Context(config), data=data, echoerr=ee)[1]:
+ hadproblem = True
+
+ colorscheme_configs = {}
+ for ext in lists['exts']:
+ colorscheme_configs[ext] = {}
+ for colorscheme in lists['colorschemes']:
+ econfigs = ext_colorscheme_configs[ext]
+ ecconfigs = econfigs.get(colorscheme)
+ mconfigs = (
+ top_colorscheme_configs.get(colorscheme),
+ econfigs.get('__main__'),
+ ecconfigs,
+ )
+ if not (mconfigs[0] or mconfigs[2]):
+ continue
+ config = None
+ for mconfig in mconfigs:
+ if not mconfig:
+ continue
+ if config:
+ config = mergedicts_copy(config, mconfig)
+ else:
+ config = mconfig
+ colorscheme_configs[ext][colorscheme] = config
+
+ theme_configs = dict2(loaded_configs['themes'])
+ top_theme_configs = dict(loaded_configs['top_themes'])
+ for ext, configs in theme_configs.items():
+ data = {
+ 'ext': ext,
+ 'colorscheme_configs': colorscheme_configs,
+ 'import_paths': import_paths,
+ 'main_config': main_config,
+ 'top_themes': top_theme_configs,
+ 'ext_theme_configs': configs,
+ 'colors_config': colors_config
+ }
+ for theme, config in configs.items():
+ data['theme'] = theme
+ if theme == '__main__':
+ data['theme_type'] = 'main'
+ spec = main_theme_spec
+ else:
+ data['theme_type'] = 'regular'
+ spec = theme_spec
+ if spec.match(config, context=Context(config), data=data, echoerr=ee)[1]:
+ hadproblem = True
+
+ for top_theme, config in top_theme_configs.items():
+ data = {
+ 'ext': None,
+ 'colorscheme_configs': colorscheme_configs,
+ 'import_paths': import_paths,
+ 'main_config': main_config,
+ 'theme_configs': theme_configs,
+ 'ext_theme_configs': None,
+ 'colors_config': colors_config
+ }
+ data['theme_type'] = 'top'
+ data['theme'] = top_theme
+ if top_theme_spec.match(config, context=Context(config), data=data, echoerr=ee)[1]:
+ hadproblem = True
+
+ return hadproblem
diff --git a/powerline/lint/checks.py b/powerline/lint/checks.py
new file mode 100644
index 0000000..8d9cb12
--- /dev/null
+++ b/powerline/lint/checks.py
@@ -0,0 +1,866 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import os
+import re
+import logging
+
+from collections import defaultdict
+
+from powerline.lib.threaded import ThreadedSegment
+from powerline.lib.unicode import unicode
+from powerline.lint.markedjson.markedvalue import MarkedUnicode
+from powerline.lint.markedjson.error import DelayedEchoErr, Mark
+from powerline.lint.selfcheck import havemarks
+from powerline.lint.context import JStr, list_themes
+from powerline.lint.imp import WithPath, import_function, import_segment
+from powerline.lint.spec import Spec
+from powerline.lint.inspect import getconfigargspec
+
+
+list_sep = JStr(', ')
+
+
+generic_keys = set((
+ 'exclude_modes', 'include_modes',
+ 'exclude_function', 'include_function',
+ 'width', 'align',
+ 'name',
+ 'draw_soft_divider', 'draw_hard_divider',
+ 'priority',
+ 'after', 'before',
+ 'display'
+))
+type_keys = {
+ 'function': set(('function', 'args', 'draw_inner_divider')),
+ 'string': set(('contents', 'type', 'highlight_groups', 'divider_highlight_group')),
+ 'segment_list': set(('function', 'segments', 'args', 'type')),
+}
+required_keys = {
+ 'function': set(('function',)),
+ 'string': set(()),
+ 'segment_list': set(('function', 'segments',)),
+}
+highlight_keys = set(('highlight_groups', 'name'))
+
+
+def get_function_strings(function_name, context, ext):
+ if '.' in function_name:
+ module, function_name = function_name.rpartition('.')[::2]
+ else:
+ module = context[0][1].get(
+ 'default_module', MarkedUnicode('powerline.segments.' + ext, None))
+ return module, function_name
+
+
+def check_matcher_func(ext, match_name, data, context, echoerr):
+ havemarks(match_name)
+ import_paths = [os.path.expanduser(path) for path in context[0][1].get('common', {}).get('paths', [])]
+
+ match_module, separator, match_function = match_name.rpartition('.')
+ if not separator:
+ match_module = 'powerline.matchers.{0}'.format(ext)
+ match_function = match_name
+ with WithPath(import_paths):
+ try:
+ func = getattr(__import__(str(match_module), fromlist=[str(match_function)]), str(match_function))
+ except ImportError:
+ echoerr(context='Error while loading matcher functions',
+ problem='failed to load module {0}'.format(match_module),
+ problem_mark=match_name.mark)
+ return True, False, True
+ except AttributeError:
+ echoerr(context='Error while loading matcher functions',
+ problem='failed to load matcher function {0}'.format(match_function),
+ problem_mark=match_name.mark)
+ return True, False, True
+
+ if not callable(func):
+ echoerr(context='Error while loading matcher functions',
+ problem='loaded “function” {0} is not callable'.format(match_function),
+ problem_mark=match_name.mark)
+ return True, False, True
+
+ if hasattr(func, 'func_code') and hasattr(func.func_code, 'co_argcount'):
+ if func.func_code.co_argcount != 1:
+ echoerr(
+ context='Error while loading matcher functions',
+ problem=(
+ 'function {0} accepts {1} arguments instead of 1. '
+ 'Are you sure it is the proper function?'
+ ).format(match_function, func.func_code.co_argcount),
+ problem_mark=match_name.mark
+ )
+
+ return True, False, False
+
+
+def check_ext(ext, data, context, echoerr):
+ havemarks(ext)
+ hadsomedirs = False
+ hadproblem = False
+ if ext not in data['lists']['exts']:
+ hadproblem = True
+ echoerr(context='Error while loading {0} extension configuration'.format(ext),
+ context_mark=ext.mark,
+ problem='extension configuration does not exist')
+ else:
+ for typ in ('themes', 'colorschemes'):
+ if ext not in data['configs'][typ] and not data['configs']['top_' + typ]:
+ hadproblem = True
+ echoerr(context='Error while loading {0} extension configuration'.format(ext),
+ context_mark=ext.mark,
+ problem='{0} configuration does not exist'.format(typ))
+ else:
+ hadsomedirs = True
+ return hadsomedirs, hadproblem
+
+
+def check_config(d, theme, data, context, echoerr):
+ if len(context) == 4:
+ ext = context[-2][0]
+ else:
+ # local_themes
+ ext = context[-3][0]
+ if ext not in data['lists']['exts']:
+ echoerr(context='Error while loading {0} extension configuration'.format(ext),
+ context_mark=ext.mark,
+ problem='extension configuration does not exist')
+ return True, False, True
+ if (
+ (ext not in data['configs'][d] or theme not in data['configs'][d][ext])
+ and theme not in data['configs']['top_' + d]
+ ):
+ echoerr(context='Error while loading {0} from {1} extension configuration'.format(d[:-1], ext),
+ problem='failed to find configuration file {0}/{1}/{2}.json'.format(d, ext, theme),
+ problem_mark=theme.mark)
+ return True, False, True
+ return True, False, False
+
+
+def check_top_theme(theme, data, context, echoerr):
+ havemarks(theme)
+ if theme not in data['configs']['top_themes']:
+ echoerr(context='Error while checking extension configuration (key {key})'.format(key=context.key),
+ context_mark=context[-2][0].mark,
+ problem='failed to find top theme {0}'.format(theme),
+ problem_mark=theme.mark)
+ return True, False, True
+ return True, False, False
+
+
+def check_color(color, data, context, echoerr):
+ havemarks(color)
+ if (color not in data['colors_config'].get('colors', {})
+ and color not in data['colors_config'].get('gradients', {})):
+ echoerr(
+ context='Error while checking highlight group in colorscheme (key {key})'.format(
+ key=context.key),
+ problem='found unexistent color or gradient {0}'.format(color),
+ problem_mark=color.mark
+ )
+ return True, False, True
+ return True, False, False
+
+
+def check_translated_group_name(group, data, context, echoerr):
+ return check_group(group, data, context, echoerr)
+
+
+def check_group(group, data, context, echoerr):
+ havemarks(group)
+ if not isinstance(group, unicode):
+ return True, False, False
+ colorscheme = data['colorscheme']
+ ext = data['ext']
+ configs = None
+ if ext:
+ def listed_key(d, k):
+ try:
+ return [d[k]]
+ except KeyError:
+ return []
+
+ if colorscheme == '__main__':
+ colorscheme_names = set(data['ext_colorscheme_configs'][ext])
+ colorscheme_names.update(data['top_colorscheme_configs'])
+ colorscheme_names.discard('__main__')
+ configs = [
+ (
+ name,
+ listed_key(data['ext_colorscheme_configs'][ext], name)
+ + listed_key(data['ext_colorscheme_configs'][ext], '__main__')
+ + listed_key(data['top_colorscheme_configs'], name)
+ )
+ for name in colorscheme_names
+ ]
+ else:
+ configs = [
+ (
+ colorscheme,
+ listed_key(data['ext_colorscheme_configs'][ext], colorscheme)
+ + listed_key(data['ext_colorscheme_configs'][ext], '__main__')
+ + listed_key(data['top_colorscheme_configs'], colorscheme)
+ )
+ ]
+ else:
+ try:
+ configs = [(colorscheme, [data['top_colorscheme_configs'][colorscheme]])]
+ except KeyError:
+ pass
+ hadproblem = False
+ for new_colorscheme, config_lst in configs:
+ not_found = []
+ new_data = data.copy()
+ new_data['colorscheme'] = new_colorscheme
+ for config in config_lst:
+ havemarks(config)
+ try:
+ group_data = config['groups'][group]
+ except KeyError:
+ not_found.append(config.mark.name)
+ else:
+ proceed, echo, chadproblem = check_group(
+ group_data,
+ new_data,
+ context,
+ echoerr,
+ )
+ if chadproblem:
+ hadproblem = True
+ if not proceed:
+ break
+ if not_found and len(not_found) == len(config_lst):
+ echoerr(
+ context='Error while checking group definition in colorscheme (key {key})'.format(
+ key=context.key),
+ problem='name {0} is not present anywhere in {1} {2} {3} colorschemes: {4}'.format(
+ group, len(not_found), ext, new_colorscheme, ', '.join(not_found)),
+ problem_mark=group.mark
+ )
+ hadproblem = True
+ return True, False, hadproblem
+
+
+def check_key_compatibility(segment, data, context, echoerr):
+ havemarks(segment)
+ segment_type = segment.get('type', MarkedUnicode('function', None))
+ havemarks(segment_type)
+
+ if segment_type not in type_keys:
+ echoerr(context='Error while checking segments (key {key})'.format(key=context.key),
+ problem='found segment with unknown type {0}'.format(segment_type),
+ problem_mark=segment_type.mark)
+ return False, False, True
+
+ hadproblem = False
+
+ keys = set(segment)
+ if not ((keys - generic_keys) < type_keys[segment_type]):
+ unknown_keys = keys - generic_keys - type_keys[segment_type]
+ echoerr(
+ context='Error while checking segments (key {key})'.format(key=context.key),
+ context_mark=context[-1][1].mark,
+ problem='found keys not used with the current segment type: {0}'.format(
+ list_sep.join(unknown_keys)),
+ problem_mark=list(unknown_keys)[0].mark
+ )
+ hadproblem = True
+
+ if not (keys >= required_keys[segment_type]):
+ missing_keys = required_keys[segment_type] - keys
+ echoerr(
+ context='Error while checking segments (key {key})'.format(key=context.key),
+ context_mark=context[-1][1].mark,
+ problem='found missing required keys: {0}'.format(
+ list_sep.join(missing_keys))
+ )
+ hadproblem = True
+
+ if not (segment_type == 'function' or (keys & highlight_keys)):
+ echoerr(
+ context='Error while checking segments (key {key})'.format(key=context.key),
+ context_mark=context[-1][1].mark,
+ problem=(
+ 'found missing keys required to determine highlight group. '
+ 'Either highlight_groups or name key must be present'
+ )
+ )
+ hadproblem = True
+
+ return True, False, hadproblem
+
+
+def check_segment_module(module, data, context, echoerr):
+ havemarks(module)
+ with WithPath(data['import_paths']):
+ try:
+ __import__(str(module))
+ except ImportError as e:
+ if echoerr.logger.level >= logging.DEBUG:
+ echoerr.logger.exception(e)
+ echoerr(context='Error while checking segments (key {key})'.format(key=context.key),
+ problem='failed to import module {0}'.format(module),
+ problem_mark=module.mark)
+ return True, False, True
+ return True, False, False
+
+
+def check_full_segment_data(segment, data, context, echoerr):
+ if 'name' not in segment and 'function' not in segment:
+ return True, False, False
+
+ ext = data['ext']
+ theme_segment_data = context[0][1].get('segment_data', {})
+ main_theme_name = data['main_config'].get('ext', {}).get(ext, {}).get('theme', None)
+ if not main_theme_name or data['theme'] == main_theme_name:
+ top_segment_data = {}
+ else:
+ top_segment_data = data['ext_theme_configs'].get(main_theme_name, {}).get('segment_data', {})
+
+ if segment.get('type', 'function') == 'function':
+ function_name = segment.get('function')
+ if function_name:
+ module, function_name = get_function_strings(function_name, context, ext)
+ names = [module + '.' + function_name, function_name]
+ else:
+ names = []
+ elif segment.get('name'):
+ names = [segment['name']]
+ else:
+ return True, False, False
+
+ segment_copy = segment.copy()
+
+ for key in ('before', 'after', 'args', 'contents'):
+ if key not in segment_copy:
+ for segment_data in [theme_segment_data, top_segment_data]:
+ for name in names:
+ try:
+ val = segment_data[name][key]
+ k = segment_data[name].keydict[key]
+ segment_copy[k] = val
+ except KeyError:
+ pass
+
+ return check_key_compatibility(segment_copy, data, context, echoerr)
+
+
+highlight_group_spec = Spec().ident().copy
+_highlight_group_spec = highlight_group_spec().context_message(
+ 'Error while checking function documentation while checking theme (key {key})')
+
+
+def check_hl_group_name(hl_group, context_mark, context, echoerr):
+ '''Check highlight group name: it should match naming conventions
+
+ :param str hl_group:
+ Checked group.
+ :param Mark context_mark:
+ Context mark. May be ``None``.
+ :param Context context:
+ Current context.
+ :param func echoerr:
+ Function used for error reporting.
+
+ :return: ``False`` if check succeeded and ``True`` if it failed.
+ '''
+ return _highlight_group_spec.match(hl_group, context_mark=context_mark, context=context, echoerr=echoerr)[1]
+
+
+def check_segment_function(function_name, data, context, echoerr):
+ havemarks(function_name)
+ ext = data['ext']
+ module, function_name = get_function_strings(function_name, context, ext)
+ if context[-2][1].get('type', 'function') == 'function':
+ func = import_segment(function_name, data, context, echoerr, module=module)
+
+ if not func:
+ return True, False, True
+
+ hl_groups = []
+ divider_hl_group = None
+
+ hadproblem = False
+
+ if func.__doc__:
+ NO_H_G_USED_STR = 'No highlight groups are used (literal segment).'
+ H_G_USED_STR = 'Highlight groups used: '
+ LHGUS = len(H_G_USED_STR)
+ D_H_G_USED_STR = 'Divider highlight group used: '
+ LDHGUS = len(D_H_G_USED_STR)
+ pointer = 0
+ mark_name = '<{0} docstring>'.format(function_name)
+ for i, line in enumerate(func.__doc__.split('\n')):
+ if H_G_USED_STR in line:
+ idx = line.index(H_G_USED_STR) + LHGUS
+ if hl_groups is None:
+ idx -= LHGUS
+ mark = Mark(mark_name, i + 1, idx + 1, func.__doc__, pointer + idx)
+ echoerr(
+ context='Error while checking theme (key {key})'.format(key=context.key),
+ context_mark=function_name.mark,
+ problem=(
+ 'found highlight group definition in addition to sentence stating that '
+ 'no highlight groups are used'
+ ),
+ problem_mark=mark,
+ )
+ hadproblem = True
+ continue
+ hl_groups.append((
+ line[idx:],
+ (mark_name, i + 1, idx + 1, func.__doc__),
+ pointer + idx
+ ))
+ elif D_H_G_USED_STR in line:
+ idx = line.index(D_H_G_USED_STR) + LDHGUS + 2
+ mark = Mark(mark_name, i + 1, idx + 1, func.__doc__, pointer + idx)
+ divider_hl_group = MarkedUnicode(line[idx:-3], mark)
+ elif NO_H_G_USED_STR in line:
+ idx = line.index(NO_H_G_USED_STR)
+ if hl_groups:
+ mark = Mark(mark_name, i + 1, idx + 1, func.__doc__, pointer + idx)
+ echoerr(
+ context='Error while checking theme (key {key})'.format(key=context.key),
+ context_mark=function_name.mark,
+ problem=(
+ 'found sentence stating that no highlight groups are used '
+ 'in addition to highlight group definition'
+ ),
+ problem_mark=mark,
+ )
+ hadproblem = True
+ continue
+ hl_groups = None
+ pointer += len(line) + len('\n')
+
+ if divider_hl_group:
+ r = hl_exists(divider_hl_group, data, context, echoerr, allow_gradients=True)
+ if r:
+ echoerr(
+ context='Error while checking theme (key {key})'.format(key=context.key),
+ context_mark=function_name.mark,
+ problem=(
+ 'found highlight group {0} not defined in the following colorschemes: {1}\n'
+ '(Group name was obtained from function documentation.)'
+ ).format(divider_hl_group, list_sep.join(r)),
+ problem_mark=divider_hl_group.mark,
+ )
+ hadproblem = True
+ if check_hl_group_name(divider_hl_group, function_name.mark, context, echoerr):
+ hadproblem = True
+
+ if hl_groups:
+ greg = re.compile(r'``([^`]+)``( \(gradient\))?')
+ parsed_hl_groups = []
+ for line, mark_args, pointer in hl_groups:
+ for s in line.split(', '):
+ required_pack = []
+ sub_pointer = pointer
+ for subs in s.split(' or '):
+ match = greg.match(subs)
+ try:
+ if not match:
+ continue
+ hl_group = MarkedUnicode(
+ match.group(1),
+ Mark(*mark_args, pointer=sub_pointer + match.start(1))
+ )
+ if check_hl_group_name(hl_group, function_name.mark, context, echoerr):
+ hadproblem = True
+ gradient = bool(match.group(2))
+ required_pack.append((hl_group, gradient))
+ finally:
+ sub_pointer += len(subs) + len(' or ')
+ parsed_hl_groups.append(required_pack)
+ pointer += len(s) + len(', ')
+ del hl_group, gradient
+ for required_pack in parsed_hl_groups:
+ rs = [
+ hl_exists(hl_group, data, context, echoerr, allow_gradients=('force' if gradient else False))
+ for hl_group, gradient in required_pack
+ ]
+ if all(rs):
+ echoerr(
+ context='Error while checking theme (key {key})'.format(key=context.key),
+ problem=(
+ 'found highlight groups list ({0}) with all groups not defined in some colorschemes\n'
+ '(Group names were taken from function documentation.)'
+ ).format(list_sep.join((h[0] for h in required_pack))),
+ problem_mark=function_name.mark
+ )
+ for r, h in zip(rs, required_pack):
+ echoerr(
+ context='Error while checking theme (key {key})'.format(key=context.key),
+ problem='found highlight group {0} not defined in the following colorschemes: {1}'.format(
+ h[0], list_sep.join(r))
+ )
+ hadproblem = True
+ elif hl_groups is not None:
+ r = hl_exists(function_name, data, context, echoerr, allow_gradients=True)
+ if r:
+ echoerr(
+ context='Error while checking theme (key {key})'.format(key=context.key),
+ problem=(
+ 'found highlight group {0} not defined in the following colorschemes: {1}\n'
+ '(If not specified otherwise in documentation, '
+ 'highlight group for function segments\n'
+ 'is the same as the function name.)'
+ ).format(function_name, list_sep.join(r)),
+ problem_mark=function_name.mark
+ )
+ hadproblem = True
+
+ return True, False, hadproblem
+ elif context[-2][1].get('type') != 'segment_list':
+ if function_name not in context[0][1].get('segment_data', {}):
+ main_theme_name = data['main_config'].get('ext', {}).get(ext, {}).get('theme', None)
+ if data['theme'] == main_theme_name:
+ main_theme = {}
+ else:
+ main_theme = data['ext_theme_configs'].get(main_theme_name, {})
+ if (
+ function_name not in main_theme.get('segment_data', {})
+ and function_name not in data['ext_theme_configs'].get('__main__', {}).get('segment_data', {})
+ and not any(((function_name in theme.get('segment_data', {})) for theme in data['top_themes'].values()))
+ ):
+ echoerr(context='Error while checking segments (key {key})'.format(key=context.key),
+ problem='found useless use of name key (such name is not present in theme/segment_data)',
+ problem_mark=function_name.mark)
+
+ return True, False, False
+
+
+def hl_group_in_colorscheme(hl_group, cconfig, allow_gradients, data, context, echoerr):
+ havemarks(hl_group, cconfig)
+ if hl_group not in cconfig.get('groups', {}):
+ return False
+ elif not allow_gradients or allow_gradients == 'force':
+ group_config = cconfig['groups'][hl_group]
+ while isinstance(group_config, unicode):
+ try:
+ group_config = cconfig['groups'][group_config]
+ except KeyError:
+ # No such group. Error was already reported when checking
+ # colorschemes.
+ return True
+ havemarks(group_config)
+ hadgradient = False
+ for ckey in ('fg', 'bg'):
+ color = group_config.get(ckey)
+ if not color:
+ # No color. Error was already reported when checking
+ # colorschemes.
+ return True
+ havemarks(color)
+ # Gradients are only allowed for function segments. Note that
+ # whether *either* color or gradient exists should have been
+ # already checked
+ hascolor = color in data['colors_config'].get('colors', {})
+ hasgradient = color in data['colors_config'].get('gradients', {})
+ if hasgradient:
+ hadgradient = True
+ if allow_gradients is False and not hascolor and hasgradient:
+ echoerr(
+ context='Error while checking highlight group in theme (key {key})'.format(
+ key=context.key),
+ context_mark=hl_group.mark,
+ problem='group {0} is using gradient {1} instead of a color'.format(hl_group, color),
+ problem_mark=color.mark
+ )
+ return False
+ if allow_gradients == 'force' and not hadgradient:
+ echoerr(
+ context='Error while checking highlight group in theme (key {key})'.format(
+ key=context.key),
+ context_mark=hl_group.mark,
+ problem='group {0} should have at least one gradient color, but it has no'.format(hl_group),
+ problem_mark=group_config.mark
+ )
+ return False
+ return True
+
+
+def hl_exists(hl_group, data, context, echoerr, allow_gradients=False):
+ havemarks(hl_group)
+ ext = data['ext']
+ if ext not in data['colorscheme_configs']:
+ # No colorschemes. Error was already reported, no need to report it
+ # twice
+ return []
+ r = []
+ found = False
+ for colorscheme, cconfig in data['colorscheme_configs'][ext].items():
+ if hl_group_in_colorscheme(hl_group, cconfig, allow_gradients, data, context, echoerr):
+ found = True
+ else:
+ r.append(colorscheme)
+ if not found:
+ pass
+ return r
+
+
+def check_highlight_group(hl_group, data, context, echoerr):
+ havemarks(hl_group)
+ r = hl_exists(hl_group, data, context, echoerr)
+ if r:
+ echoerr(
+ context='Error while checking theme (key {key})'.format(key=context.key),
+ problem='found highlight group {0} not defined in the following colorschemes: {1}'.format(
+ hl_group, list_sep.join(r)),
+ problem_mark=hl_group.mark
+ )
+ return True, False, True
+ return True, False, False
+
+
+def check_highlight_groups(hl_groups, data, context, echoerr):
+ havemarks(hl_groups)
+ rs = [hl_exists(hl_group, data, context, echoerr) for hl_group in hl_groups]
+ if all(rs):
+ echoerr(
+ context='Error while checking theme (key {key})'.format(key=context.key),
+ problem='found highlight groups list ({0}) with all groups not defined in some colorschemes'.format(
+ list_sep.join((unicode(h) for h in hl_groups))),
+ problem_mark=hl_groups.mark
+ )
+ for r, hl_group in zip(rs, hl_groups):
+ echoerr(
+ context='Error while checking theme (key {key})'.format(key=context.key),
+ problem='found highlight group {0} not defined in the following colorschemes: {1}'.format(
+ hl_group, list_sep.join(r)),
+ problem_mark=hl_group.mark
+ )
+ return True, False, True
+ return True, False, False
+
+
+def check_segment_data_key(key, data, context, echoerr):
+ havemarks(key)
+ has_module_name = '.' in key
+ found = False
+ for ext, theme in list_themes(data, context):
+ for segments in theme.get('segments', {}).values():
+ for segment in segments:
+ if 'name' in segment:
+ if key == segment['name']:
+ found = True
+ break
+ else:
+ function_name = segment.get('function')
+ if function_name:
+ module, function_name = get_function_strings(function_name, ((None, theme),), ext)
+ if has_module_name:
+ full_name = module + '.' + function_name
+ if key == full_name:
+ found = True
+ break
+ else:
+ if key == function_name:
+ found = True
+ break
+ if found:
+ break
+ if found:
+ break
+ else:
+ if data['theme_type'] != 'top':
+ echoerr(context='Error while checking segment data',
+ problem='found key {0} that cannot be associated with any segment'.format(key),
+ problem_mark=key.mark)
+ return True, False, True
+
+ return True, False, False
+
+
+threaded_args_specs = {
+ 'interval': Spec().cmp('gt', 0.0),
+ 'update_first': Spec().type(bool),
+ 'shutdown_event': Spec().error('Shutdown event must be set by powerline'),
+}
+
+
+def check_args_variant(func, args, data, context, echoerr):
+ havemarks(args)
+ argspec = getconfigargspec(func)
+ present_args = set(args)
+ all_args = set(argspec.args)
+ required_args = set(argspec.args[:-len(argspec.defaults)])
+
+ hadproblem = False
+
+ if required_args - present_args:
+ echoerr(
+ context='Error while checking segment arguments (key {key})'.format(key=context.key),
+ context_mark=args.mark,
+ problem='some of the required keys are missing: {0}'.format(list_sep.join(required_args - present_args))
+ )
+ hadproblem = True
+
+ if not all_args >= present_args:
+ echoerr(context='Error while checking segment arguments (key {key})'.format(key=context.key),
+ context_mark=args.mark,
+ problem='found unknown keys: {0}'.format(list_sep.join(present_args - all_args)),
+ problem_mark=next(iter(present_args - all_args)).mark)
+ hadproblem = True
+
+ if isinstance(func, ThreadedSegment):
+ for key in set(threaded_args_specs) & present_args:
+ proceed, khadproblem = threaded_args_specs[key].match(
+ args[key],
+ args.mark,
+ data,
+ context.enter_key(args, key),
+ echoerr
+ )
+ if khadproblem:
+ hadproblem = True
+ if not proceed:
+ return hadproblem
+
+ return hadproblem
+
+
+def check_args(get_functions, args, data, context, echoerr):
+ new_echoerr = DelayedEchoErr(echoerr)
+ count = 0
+ hadproblem = False
+ for func in get_functions(data, context, new_echoerr):
+ count += 1
+ shadproblem = check_args_variant(func, args, data, context, echoerr)
+ if shadproblem:
+ hadproblem = True
+
+ if not count:
+ hadproblem = True
+ if new_echoerr:
+ new_echoerr.echo_all()
+ else:
+ echoerr(context='Error while checking segment arguments (key {key})'.format(key=context.key),
+ context_mark=context[-2][1].mark,
+ problem='no suitable segments found')
+
+ return True, False, hadproblem
+
+
+def get_one_segment_function(data, context, echoerr):
+ ext = data['ext']
+ function_name = context[-2][1].get('function')
+ if function_name:
+ module, function_name = get_function_strings(function_name, context, ext)
+ func = import_segment(function_name, data, context, echoerr, module=module)
+ if func:
+ yield func
+
+
+common_names = defaultdict(set)
+
+
+def register_common_name(name, cmodule, cname):
+ s = cmodule + '.' + cname
+ cmodule_mark = Mark('<common name definition>', 1, 1, s, 1)
+ cname_mark = Mark('<common name definition>', 1, len(cmodule) + 1, s, len(cmodule) + 1)
+ common_names[name].add((MarkedUnicode(cmodule, cmodule_mark), MarkedUnicode(cname, cname_mark)))
+
+
+def get_all_possible_functions(data, context, echoerr):
+ name = context[-2][0]
+ module, name = name.rpartition('.')[::2]
+ if module:
+ func = import_segment(name, data, context, echoerr, module=module)
+ if func:
+ yield func
+ else:
+ if name in common_names:
+ for cmodule, cname in common_names[name]:
+ cfunc = import_segment(cname, data, context, echoerr, module=MarkedUnicode(cmodule, None))
+ if cfunc:
+ yield cfunc
+ for ext, theme_config in list_themes(data, context):
+ for segments in theme_config.get('segments', {}).values():
+ for segment in segments:
+ if segment.get('type', 'function') == 'function':
+ function_name = segment.get('function')
+ current_name = segment.get('name')
+ if function_name:
+ module, function_name = get_function_strings(function_name, ((None, theme_config),), ext)
+ if current_name == name or function_name == name:
+ func = import_segment(function_name, data, context, echoerr, module=module)
+ if func:
+ yield func
+
+
+def check_exinclude_function(name, data, context, echoerr):
+ ext = data['ext']
+ module, name = name.rpartition('.')[::2]
+ if not module:
+ module = MarkedUnicode('powerline.selectors.' + ext, None)
+ func = import_function('selector', name, data, context, echoerr, module=module)
+ if not func:
+ return True, False, True
+ return True, False, False
+
+
+def check_log_file_level(this_level, data, context, echoerr):
+ '''Check handler level specified in :ref:`log_file key <config-common-log>`
+
+ This level must be greater or equal to the level in :ref:`log_level key
+ <config-common-log_level>`.
+ '''
+ havemarks(this_level)
+ hadproblem = False
+ top_level = context[0][1].get('common', {}).get('log_level', 'WARNING')
+ top_level_str = top_level
+ top_level_mark = getattr(top_level, 'mark', None)
+ if (
+ not isinstance(top_level, unicode) or not hasattr(logging, top_level)
+ or not isinstance(this_level, unicode) or not hasattr(logging, this_level)
+ ):
+ return True, False, hadproblem
+ top_level = getattr(logging, top_level)
+ this_level_str = this_level
+ this_level_mark = this_level.mark
+ this_level = getattr(logging, this_level)
+ if this_level < top_level:
+ echoerr(
+ context='Error while checking log level index (key {key})'.format(
+ key=context.key),
+ context_mark=this_level_mark,
+ problem='found level that is less critical then top level ({0} < {0})'.format(
+ this_level_str, top_level_str),
+ problem_mark=top_level_mark,
+ )
+ hadproblem = True
+ return True, False, hadproblem
+
+
+def check_logging_handler(handler_name, data, context, echoerr):
+ havemarks(handler_name)
+ import_paths = [os.path.expanduser(path) for path in context[0][1].get('common', {}).get('paths', [])]
+
+ handler_module, separator, handler_class = handler_name.rpartition('.')
+ if not separator:
+ handler_module = 'logging.handlers'
+ handler_class = handler_name
+ with WithPath(import_paths):
+ try:
+ handler = getattr(__import__(str(handler_module), fromlist=[str(handler_class)]), str(handler_class))
+ except ImportError:
+ echoerr(context='Error while loading logger class (key {key})'.format(key=context.key),
+ problem='failed to load module {0}'.format(handler_module),
+ problem_mark=handler_name.mark)
+ return True, False, True
+ except AttributeError:
+ echoerr(context='Error while loading logger class (key {key})'.format(key=context.key),
+ problem='failed to load handler class {0}'.format(handler_class),
+ problem_mark=handler_name.mark)
+ return True, False, True
+
+ if not issubclass(handler, logging.Handler):
+ echoerr(context='Error while loading logger class (key {key})'.format(key=context.key),
+ problem='loaded class {0} is not a logging.Handler subclass'.format(handler_class),
+ problem_mark=handler_name.mark)
+ return True, False, True
+
+ return True, False, False
diff --git a/powerline/lint/context.py b/powerline/lint/context.py
new file mode 100644
index 0000000..a48a283
--- /dev/null
+++ b/powerline/lint/context.py
@@ -0,0 +1,68 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import itertools
+
+from powerline.lib.unicode import unicode
+from powerline.lint.markedjson.markedvalue import MarkedUnicode
+from powerline.lint.selfcheck import havemarks
+
+
+class JStr(unicode):
+ def join(self, iterable):
+ return super(JStr, self).join((unicode(item) for item in iterable))
+
+
+key_sep = JStr('/')
+
+
+def list_themes(data, context):
+ theme_type = data['theme_type']
+ ext = data['ext']
+ main_theme_name = data['main_config'].get('ext', {}).get(ext, {}).get('theme', None)
+ is_main_theme = (data['theme'] == main_theme_name)
+ if theme_type == 'top':
+ return list(itertools.chain(*[
+ [(theme_ext, theme) for theme in theme_configs.values()]
+ for theme_ext, theme_configs in data['theme_configs'].items()
+ ]))
+ elif theme_type == 'main' or is_main_theme:
+ return [(ext, theme) for theme in data['ext_theme_configs'].values()]
+ else:
+ return [(ext, context[0][1])]
+
+
+class Context(tuple):
+ for func in dir(tuple):
+ if func in ('__getitem__', '__init__', '__getattribute__', '__len__', '__iter__'):
+ continue
+ exec((
+ 'def {0}(self, *args, **kwargs):\n'
+ ' raise TypeError("{0} is not allowed for Context")'
+ ).format(func))
+ del func
+
+ __slots__ = ()
+
+ def __new__(cls, base, context_key=None, context_value=None):
+ if context_key is not None:
+ assert(context_value is not None)
+ assert(type(base) is Context)
+ havemarks(context_key, context_value)
+ return tuple.__new__(cls, tuple.__add__(base, ((context_key, context_value),)))
+ else:
+ havemarks(base)
+ return tuple.__new__(cls, ((MarkedUnicode('', base.mark), base),))
+
+ @property
+ def key(self):
+ return key_sep.join((c[0] for c in self))
+
+ def enter_key(self, value, key):
+ return self.enter(value.keydict[key], value[key])
+
+ def enter_item(self, name, item):
+ return self.enter(MarkedUnicode(name, item.mark), item)
+
+ def enter(self, context_key, context_value):
+ return Context.__new__(Context, self, context_key, context_value)
diff --git a/powerline/lint/imp.py b/powerline/lint/imp.py
new file mode 100644
index 0000000..399654e
--- /dev/null
+++ b/powerline/lint/imp.py
@@ -0,0 +1,56 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import sys
+
+from powerline.lint.selfcheck import havemarks
+
+
+class WithPath(object):
+ def __init__(self, import_paths):
+ self.import_paths = import_paths
+
+ def __enter__(self):
+ self.oldpath = sys.path
+ sys.path = self.import_paths + sys.path
+
+ def __exit__(self, *args):
+ sys.path = self.oldpath
+
+
+def import_function(function_type, name, data, context, echoerr, module):
+ havemarks(name, module)
+
+ if module == 'powerline.segments.i3wm' and name == 'workspaces':
+ echoerr(context='Warning while checking segments (key {key})'.format(key=context.key),
+ context_mark=name.mark,
+ problem='segment {0} from {1} is deprecated'.format(name, module),
+ problem_mark=module.mark)
+
+ with WithPath(data['import_paths']):
+ try:
+ func = getattr(__import__(str(module), fromlist=[str(name)]), str(name))
+ except ImportError:
+ echoerr(context='Error while checking segments (key {key})'.format(key=context.key),
+ context_mark=name.mark,
+ problem='failed to import module {0}'.format(module),
+ problem_mark=module.mark)
+ return None
+ except AttributeError:
+ echoerr(context='Error while loading {0} function (key {key})'.format(function_type, key=context.key),
+ problem='failed to load function {0} from module {1}'.format(name, module),
+ problem_mark=name.mark)
+ return None
+
+ if not callable(func):
+ echoerr(context='Error while checking segments (key {key})'.format(key=context.key),
+ context_mark=name.mark,
+ problem='imported “function” {0} from module {1} is not callable'.format(name, module),
+ problem_mark=module.mark)
+ return None
+
+ return func
+
+
+def import_segment(*args, **kwargs):
+ return import_function('segment', *args, **kwargs)
diff --git a/powerline/lint/inspect.py b/powerline/lint/inspect.py
new file mode 100644
index 0000000..15bb610
--- /dev/null
+++ b/powerline/lint/inspect.py
@@ -0,0 +1,63 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+from inspect import ArgSpec, getargspec
+
+from powerline.segments import Segment
+
+
+def getconfigargspec(obj):
+ if hasattr(obj, 'powerline_origin'):
+ obj = obj.powerline_origin
+ else:
+ obj = obj
+
+ args = []
+ defaults = []
+
+ if isinstance(obj, Segment):
+ additional_args = obj.additional_args()
+ argspecobjs = obj.argspecobjs()
+ get_omitted_args = obj.omitted_args
+ else:
+ additional_args = ()
+ argspecobjs = ((None, obj),)
+ get_omitted_args = lambda *args: ()
+
+ for arg in additional_args:
+ args.append(arg[0])
+ if len(arg) > 1:
+ defaults.append(arg[1])
+
+ requires_segment_info = hasattr(obj, 'powerline_requires_segment_info')
+ requires_filesystem_watcher = hasattr(obj, 'powerline_requires_filesystem_watcher')
+
+ for name, method in argspecobjs:
+ argspec = getargspec(method)
+ omitted_args = get_omitted_args(name, method)
+ largs = len(argspec.args)
+ for i, arg in enumerate(reversed(argspec.args)):
+ if (
+ largs - (i + 1) in omitted_args
+ or arg in omitted_args
+ or arg == 'pl'
+ or arg == 'self'
+ or (arg == 'create_watcher' and requires_filesystem_watcher)
+ or (arg == 'segment_info' and requires_segment_info)
+ ):
+ continue
+ if argspec.defaults and len(argspec.defaults) > i:
+ if arg in args:
+ idx = args.index(arg)
+ if len(args) - idx > len(defaults):
+ args.pop(idx)
+ else:
+ continue
+ default = argspec.defaults[-(i + 1)]
+ defaults.append(default)
+ args.append(arg)
+ else:
+ if arg not in args:
+ args.insert(0, arg)
+
+ return ArgSpec(args=args, varargs=None, keywords=None, defaults=tuple(defaults))
diff --git a/powerline/lint/markedjson/__init__.py b/powerline/lint/markedjson/__init__.py
new file mode 100644
index 0000000..dea5faf
--- /dev/null
+++ b/powerline/lint/markedjson/__init__.py
@@ -0,0 +1,19 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+from powerline.lint.markedjson.loader import Loader
+
+
+def load(stream, Loader=Loader):
+ '''Parse JSON value and produce the corresponding Python object
+
+ :return:
+ (hadproblem, object) where first argument is true if there were errors
+ during loading JSON stream and second is the corresponding JSON object.
+ '''
+ loader = Loader(stream)
+ try:
+ r = loader.get_single_data()
+ return r, loader.haserrors
+ finally:
+ loader.dispose()
diff --git a/powerline/lint/markedjson/composer.py b/powerline/lint/markedjson/composer.py
new file mode 100644
index 0000000..bd5620d
--- /dev/null
+++ b/powerline/lint/markedjson/composer.py
@@ -0,0 +1,119 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+from powerline.lint.markedjson import nodes
+from powerline.lint.markedjson import events
+from powerline.lint.markedjson.error import MarkedError
+
+
+__all__ = ['Composer', 'ComposerError']
+
+
+class ComposerError(MarkedError):
+ pass
+
+
+class Composer:
+ def __init__(self):
+ pass
+
+ def check_node(self):
+ # Drop the STREAM-START event.
+ if self.check_event(events.StreamStartEvent):
+ self.get_event()
+
+ # If there are more documents available?
+ return not self.check_event(events.StreamEndEvent)
+
+ def get_node(self):
+ # Get the root node of the next document.
+ if not self.check_event(events.StreamEndEvent):
+ return self.compose_document()
+
+ def get_single_node(self):
+ # Drop the STREAM-START event.
+ self.get_event()
+
+ # Compose a document if the stream is not empty.
+ document = None
+ if not self.check_event(events.StreamEndEvent):
+ document = self.compose_document()
+
+ # Ensure that the stream contains no more documents.
+ if not self.check_event(events.StreamEndEvent):
+ event = self.get_event()
+ raise ComposerError(
+ 'expected a single document in the stream',
+ document.start_mark,
+ 'but found another document',
+ event.start_mark
+ )
+
+ # Drop the STREAM-END event.
+ self.get_event()
+
+ return document
+
+ def compose_document(self):
+ # Drop the DOCUMENT-START event.
+ self.get_event()
+
+ # Compose the root node.
+ node = self.compose_node(None, None)
+
+ # Drop the DOCUMENT-END event.
+ self.get_event()
+
+ return node
+
+ def compose_node(self, parent, index):
+ self.descend_resolver(parent, index)
+ if self.check_event(events.ScalarEvent):
+ node = self.compose_scalar_node()
+ elif self.check_event(events.SequenceStartEvent):
+ node = self.compose_sequence_node()
+ elif self.check_event(events.MappingStartEvent):
+ node = self.compose_mapping_node()
+ self.ascend_resolver()
+ return node
+
+ def compose_scalar_node(self):
+ event = self.get_event()
+ tag = event.tag
+ if tag is None or tag == '!':
+ tag = self.resolve(nodes.ScalarNode, event.value, event.implicit, event.start_mark)
+ node = nodes.ScalarNode(tag, event.value, event.start_mark, event.end_mark, style=event.style)
+ return node
+
+ def compose_sequence_node(self):
+ start_event = self.get_event()
+ tag = start_event.tag
+ if tag is None or tag == '!':
+ tag = self.resolve(nodes.SequenceNode, None, start_event.implicit)
+ node = nodes.SequenceNode(tag, [], start_event.start_mark, None, flow_style=start_event.flow_style)
+ index = 0
+ while not self.check_event(events.SequenceEndEvent):
+ node.value.append(self.compose_node(node, index))
+ index += 1
+ end_event = self.get_event()
+ node.end_mark = end_event.end_mark
+ return node
+
+ def compose_mapping_node(self):
+ start_event = self.get_event()
+ tag = start_event.tag
+ if tag is None or tag == '!':
+ tag = self.resolve(nodes.MappingNode, None, start_event.implicit)
+ node = nodes.MappingNode(tag, [], start_event.start_mark, None, flow_style=start_event.flow_style)
+ while not self.check_event(events.MappingEndEvent):
+ # key_event = self.peek_event()
+ item_key = self.compose_node(node, None)
+ # if item_key in node.value:
+ # raise ComposerError('while composing a mapping', start_event.start_mark,
+ # 'found duplicate key', key_event.start_mark)
+ item_value = self.compose_node(node, item_key)
+ # node.value[item_key] = item_value
+ node.value.append((item_key, item_value))
+ end_event = self.get_event()
+ node.end_mark = end_event.end_mark
+ return node
diff --git a/powerline/lint/markedjson/constructor.py b/powerline/lint/markedjson/constructor.py
new file mode 100644
index 0000000..372d84b
--- /dev/null
+++ b/powerline/lint/markedjson/constructor.py
@@ -0,0 +1,285 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import collections
+import types
+
+from functools import wraps
+
+from powerline.lint.markedjson.error import MarkedError
+
+from powerline.lint.markedjson import nodes
+from powerline.lint.markedjson.markedvalue import gen_marked_value
+from powerline.lib.unicode import unicode
+
+
+def marked(func):
+ @wraps(func)
+ def f(self, node, *args, **kwargs):
+ return gen_marked_value(func(self, node, *args, **kwargs), node.start_mark)
+ return f
+
+
+class ConstructorError(MarkedError):
+ pass
+
+
+class BaseConstructor:
+ yaml_constructors = {}
+
+ def __init__(self):
+ self.constructed_objects = {}
+ self.state_generators = []
+ self.deep_construct = False
+
+ def check_data(self):
+ # If there are more documents available?
+ return self.check_node()
+
+ def get_data(self):
+ # Construct and return the next document.
+ if self.check_node():
+ return self.construct_document(self.get_node())
+
+ def get_single_data(self):
+ # Ensure that the stream contains a single document and construct it.
+ node = self.get_single_node()
+ if node is not None:
+ return self.construct_document(node)
+ return None
+
+ def construct_document(self, node):
+ data = self.construct_object(node)
+ while self.state_generators:
+ state_generators = self.state_generators
+ self.state_generators = []
+ for generator in state_generators:
+ for dummy in generator:
+ pass
+ self.constructed_objects = {}
+ self.deep_construct = False
+ return data
+
+ def construct_object(self, node, deep=False):
+ if node in self.constructed_objects:
+ return self.constructed_objects[node]
+ if deep:
+ old_deep = self.deep_construct
+ self.deep_construct = True
+ constructor = None
+ tag_suffix = None
+ if node.tag in self.yaml_constructors:
+ constructor = self.yaml_constructors[node.tag]
+ else:
+ raise ConstructorError(None, None, 'no constructor for tag %s' % node.tag)
+ if tag_suffix is None:
+ data = constructor(self, node)
+ else:
+ data = constructor(self, tag_suffix, node)
+ if isinstance(data, types.GeneratorType):
+ generator = data
+ data = next(generator)
+ if self.deep_construct:
+ for dummy in generator:
+ pass
+ else:
+ self.state_generators.append(generator)
+ self.constructed_objects[node] = data
+ if deep:
+ self.deep_construct = old_deep
+ return data
+
+ @marked
+ def construct_scalar(self, node):
+ if not isinstance(node, nodes.ScalarNode):
+ raise ConstructorError(
+ None, None,
+ 'expected a scalar node, but found %s' % node.id,
+ node.start_mark
+ )
+ return node.value
+
+ def construct_sequence(self, node, deep=False):
+ if not isinstance(node, nodes.SequenceNode):
+ raise ConstructorError(
+ None, None,
+ 'expected a sequence node, but found %s' % node.id,
+ node.start_mark
+ )
+ return [
+ self.construct_object(child, deep=deep)
+ for child in node.value
+ ]
+
+ @marked
+ def construct_mapping(self, node, deep=False):
+ if not isinstance(node, nodes.MappingNode):
+ raise ConstructorError(
+ None, None,
+ 'expected a mapping node, but found %s' % node.id,
+ node.start_mark
+ )
+ mapping = {}
+ for key_node, value_node in node.value:
+ key = self.construct_object(key_node, deep=deep)
+ if not isinstance(key, collections.abc.Hashable):
+ self.echoerr(
+ 'While constructing a mapping', node.start_mark,
+ 'found unhashable key', key_node.start_mark
+ )
+ continue
+ elif type(key.value) != unicode:
+ self.echoerr(
+ 'Error while constructing a mapping', node.start_mark,
+ 'found key that is not a string', key_node.start_mark
+ )
+ continue
+ elif key in mapping:
+ self.echoerr(
+ 'Error while constructing a mapping', node.start_mark,
+ 'found duplicate key', key_node.start_mark
+ )
+ continue
+ value = self.construct_object(value_node, deep=deep)
+ mapping[key] = value
+ return mapping
+
+ @classmethod
+ def add_constructor(cls, tag, constructor):
+ if 'yaml_constructors' not in cls.__dict__:
+ cls.yaml_constructors = cls.yaml_constructors.copy()
+ cls.yaml_constructors[tag] = constructor
+
+
+class Constructor(BaseConstructor):
+ def construct_scalar(self, node):
+ if isinstance(node, nodes.MappingNode):
+ for key_node, value_node in node.value:
+ if key_node.tag == 'tag:yaml.org,2002:value':
+ return self.construct_scalar(value_node)
+ return BaseConstructor.construct_scalar(self, node)
+
+ def flatten_mapping(self, node):
+ merge = []
+ index = 0
+ while index < len(node.value):
+ key_node, value_node = node.value[index]
+ if key_node.tag == 'tag:yaml.org,2002:merge':
+ del node.value[index]
+ if isinstance(value_node, nodes.MappingNode):
+ self.flatten_mapping(value_node)
+ merge.extend(value_node.value)
+ elif isinstance(value_node, nodes.SequenceNode):
+ submerge = []
+ for subnode in value_node.value:
+ if not isinstance(subnode, nodes.MappingNode):
+ raise ConstructorError(
+ 'while constructing a mapping',
+ node.start_mark,
+ 'expected a mapping for merging, but found %s' % subnode.id,
+ subnode.start_mark
+ )
+ self.flatten_mapping(subnode)
+ submerge.append(subnode.value)
+ submerge.reverse()
+ for value in submerge:
+ merge.extend(value)
+ else:
+ raise ConstructorError(
+ 'while constructing a mapping',
+ node.start_mark,
+ ('expected a mapping or list of mappings for merging, but found %s' % value_node.id),
+ value_node.start_mark
+ )
+ elif key_node.tag == 'tag:yaml.org,2002:value':
+ key_node.tag = 'tag:yaml.org,2002:str'
+ index += 1
+ else:
+ index += 1
+ if merge:
+ node.value = merge + node.value
+
+ def construct_mapping(self, node, deep=False):
+ if isinstance(node, nodes.MappingNode):
+ self.flatten_mapping(node)
+ return BaseConstructor.construct_mapping(self, node, deep=deep)
+
+ @marked
+ def construct_yaml_null(self, node):
+ self.construct_scalar(node)
+ return None
+
+ @marked
+ def construct_yaml_bool(self, node):
+ value = self.construct_scalar(node).value
+ return bool(value)
+
+ @marked
+ def construct_yaml_int(self, node):
+ value = self.construct_scalar(node).value
+ sign = +1
+ if value[0] == '-':
+ sign = -1
+ if value[0] in '+-':
+ value = value[1:]
+ if value == '0':
+ return 0
+ else:
+ return sign * int(value)
+
+ @marked
+ def construct_yaml_float(self, node):
+ value = self.construct_scalar(node).value
+ sign = +1
+ if value[0] == '-':
+ sign = -1
+ if value[0] in '+-':
+ value = value[1:]
+ else:
+ return sign * float(value)
+
+ def construct_yaml_str(self, node):
+ return self.construct_scalar(node)
+
+ def construct_yaml_seq(self, node):
+ data = gen_marked_value([], node.start_mark)
+ yield data
+ data.extend(self.construct_sequence(node))
+
+ def construct_yaml_map(self, node):
+ data = gen_marked_value({}, node.start_mark)
+ yield data
+ value = self.construct_mapping(node)
+ data.update(value)
+
+ def construct_undefined(self, node):
+ raise ConstructorError(
+ None, None,
+ 'could not determine a constructor for the tag %r' % node.tag,
+ node.start_mark
+ )
+
+
+Constructor.add_constructor(
+ 'tag:yaml.org,2002:null', Constructor.construct_yaml_null)
+
+Constructor.add_constructor(
+ 'tag:yaml.org,2002:bool', Constructor.construct_yaml_bool)
+
+Constructor.add_constructor(
+ 'tag:yaml.org,2002:int', Constructor.construct_yaml_int)
+
+Constructor.add_constructor(
+ 'tag:yaml.org,2002:float', Constructor.construct_yaml_float)
+
+Constructor.add_constructor(
+ 'tag:yaml.org,2002:str', Constructor.construct_yaml_str)
+
+Constructor.add_constructor(
+ 'tag:yaml.org,2002:seq', Constructor.construct_yaml_seq)
+
+Constructor.add_constructor(
+ 'tag:yaml.org,2002:map', Constructor.construct_yaml_map)
+
+Constructor.add_constructor(
+ None, Constructor.construct_undefined)
diff --git a/powerline/lint/markedjson/error.py b/powerline/lint/markedjson/error.py
new file mode 100644
index 0000000..732120b
--- /dev/null
+++ b/powerline/lint/markedjson/error.py
@@ -0,0 +1,241 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import sys
+import re
+
+from powerline.lib.encoding import get_preferred_output_encoding
+
+
+NON_PRINTABLE_STR = (
+ '[^'
+ # ASCII control characters: 0x00-0x19
+ + '\t\n' # Tab, newline: allowed ASCII control characters
+ + '\x20-\x7E' # ASCII printable characters
+ # Unicode control characters: 0x7F-0x9F
+ + '\u0085' # Allowed unicode control character: next line character
+ + '\u00A0-\uD7FF'
+ # Surrogate escapes: 0xD800-0xDFFF
+ + '\uE000-\uFFFD'
+ + ((
+ '\uD800-\uDFFF'
+ ) if sys.maxunicode < 0x10FFFF else (
+ '\U00010000-\U0010FFFF'
+ ))
+ + ']'
+ + ((
+ # Paired surrogate escapes: allowed in UCS-2 builds as the only way to
+ # represent characters above 0xFFFF. Only paired variant is allowed.
+ '|(?<![\uD800-\uDBFF])[\uDC00-\uDFFF]'
+ + '|[\uD800-\uDBFF](?![\uDC00-\uDFFF])'
+ ) if sys.maxunicode < 0x10FFFF else (
+ ''
+ ))
+)
+NON_PRINTABLE_RE = re.compile(NON_PRINTABLE_STR)
+
+
+def repl(s):
+ return '<x%04x>' % ord(s.group())
+
+
+def strtrans(s):
+ return NON_PRINTABLE_RE.sub(repl, s.replace('\t', '>---'))
+
+
+class Mark:
+ def __init__(self, name, line, column, buffer, pointer, old_mark=None, merged_marks=None):
+ self.name = name
+ self.line = line
+ self.column = column
+ self.buffer = buffer
+ self.pointer = pointer
+ self.old_mark = old_mark
+ self.merged_marks = merged_marks or []
+
+ def copy(self):
+ return Mark(self.name, self.line, self.column, self.buffer, self.pointer, self.old_mark, self.merged_marks[:])
+
+ def get_snippet(self, indent=4, max_length=75):
+ if self.buffer is None:
+ return None
+ head = ''
+ start = self.pointer
+ while start > 0 and self.buffer[start - 1] not in '\0\n':
+ start -= 1
+ if self.pointer - start > max_length / 2 - 1:
+ head = ' ... '
+ start += 5
+ break
+ tail = ''
+ end = self.pointer
+ while end < len(self.buffer) and self.buffer[end] not in '\0\n':
+ end += 1
+ if end - self.pointer > max_length / 2 - 1:
+ tail = ' ... '
+ end -= 5
+ break
+ snippet = [self.buffer[start:self.pointer], self.buffer[self.pointer], self.buffer[self.pointer + 1:end]]
+ snippet = [strtrans(s) for s in snippet]
+ return (
+ ' ' * indent + head + ''.join(snippet) + tail + '\n'
+ + ' ' * (indent + len(head) + len(snippet[0])) + '^'
+ )
+
+ def advance_string(self, diff):
+ ret = self.copy()
+ # FIXME Currently does not work properly with escaped strings.
+ ret.column += diff
+ ret.pointer += diff
+ return ret
+
+ def set_old_mark(self, old_mark):
+ if self is old_mark:
+ return
+ checked_marks = set([id(self)])
+ older_mark = old_mark
+ while True:
+ if id(older_mark) in checked_marks:
+ raise ValueError('Trying to set recursive marks')
+ checked_marks.add(id(older_mark))
+ older_mark = older_mark.old_mark
+ if not older_mark:
+ break
+ self.old_mark = old_mark
+
+ def set_merged_mark(self, merged_mark):
+ self.merged_marks.append(merged_mark)
+
+ def to_string(self, indent=0, head_text='in ', add_snippet=True):
+ mark = self
+ where = ''
+ processed_marks = set()
+ while mark:
+ indentstr = ' ' * indent
+ where += ('%s %s"%s", line %d, column %d' % (
+ indentstr, head_text, mark.name, mark.line + 1, mark.column + 1))
+ if add_snippet:
+ snippet = mark.get_snippet(indent=(indent + 4))
+ if snippet:
+ where += ':\n' + snippet
+ if mark.merged_marks:
+ where += '\n' + indentstr + ' with additionally merged\n'
+ where += mark.merged_marks[0].to_string(indent + 4, head_text='', add_snippet=False)
+ for mmark in mark.merged_marks[1:]:
+ where += '\n' + indentstr + ' and\n'
+ where += mmark.to_string(indent + 4, head_text='', add_snippet=False)
+ if add_snippet:
+ processed_marks.add(id(mark))
+ if mark.old_mark:
+ where += '\n' + indentstr + ' which replaced value\n'
+ indent += 4
+ mark = mark.old_mark
+ if id(mark) in processed_marks:
+ raise ValueError('Trying to dump recursive mark')
+ return where
+
+ if sys.version_info < (3,):
+ def __str__(self):
+ return self.to_string().encode('utf-8')
+
+ def __unicode__(self):
+ return self.to_string()
+ else:
+ def __str__(self):
+ return self.to_string()
+
+ def __eq__(self, other):
+ return self is other or (
+ self.name == other.name
+ and self.line == other.line
+ and self.column == other.column
+ )
+
+
+if sys.version_info < (3,):
+ def echoerr(**kwargs):
+ stream = kwargs.pop('stream', sys.stderr)
+ stream.write('\n')
+ stream.write((format_error(**kwargs) + '\n').encode(get_preferred_output_encoding()))
+else:
+ def echoerr(**kwargs):
+ stream = kwargs.pop('stream', sys.stderr)
+ stream.write('\n')
+ stream.write(format_error(**kwargs) + '\n')
+
+
+def format_error(context=None, context_mark=None, problem=None, problem_mark=None, note=None, indent=0):
+ lines = []
+ indentstr = ' ' * indent
+ if context is not None:
+ lines.append(indentstr + context)
+ if (
+ context_mark is not None
+ and (
+ problem is None or problem_mark is None
+ or context_mark != problem_mark
+ )
+ ):
+ lines.append(context_mark.to_string(indent=indent))
+ if problem is not None:
+ lines.append(indentstr + problem)
+ if problem_mark is not None:
+ lines.append(problem_mark.to_string(indent=indent))
+ if note is not None:
+ lines.append(indentstr + note)
+ return '\n'.join(lines)
+
+
+class MarkedError(Exception):
+ def __init__(self, context=None, context_mark=None, problem=None, problem_mark=None, note=None):
+ Exception.__init__(self, format_error(context, context_mark, problem, problem_mark, note))
+
+
+class EchoErr(object):
+ __slots__ = ('echoerr', 'logger', 'indent')
+
+ def __init__(self, echoerr, logger, indent=0):
+ self.echoerr = echoerr
+ self.logger = logger
+ self.indent = indent
+
+ def __call__(self, **kwargs):
+ kwargs = kwargs.copy()
+ kwargs.setdefault('indent', self.indent)
+ self.echoerr(**kwargs)
+
+
+class DelayedEchoErr(EchoErr):
+ __slots__ = ('echoerr', 'logger', 'errs', 'message', 'separator_message', 'indent', 'indent_shift')
+
+ def __init__(self, echoerr, message='', separator_message=''):
+ super(DelayedEchoErr, self).__init__(echoerr, echoerr.logger)
+ self.errs = [[]]
+ self.message = message
+ self.separator_message = separator_message
+ self.indent_shift = (4 if message or separator_message else 0)
+ self.indent = echoerr.indent + self.indent_shift
+
+ def __call__(self, **kwargs):
+ kwargs = kwargs.copy()
+ kwargs['indent'] = kwargs.get('indent', 0) + self.indent
+ self.errs[-1].append(kwargs)
+
+ def next_variant(self):
+ self.errs.append([])
+
+ def echo_all(self):
+ if self.message:
+ self.echoerr(problem=self.message, indent=(self.indent - self.indent_shift))
+ for variant in self.errs:
+ if not variant:
+ continue
+ if self.separator_message and variant is not self.errs[0]:
+ self.echoerr(problem=self.separator_message, indent=(self.indent - self.indent_shift))
+ for kwargs in variant:
+ self.echoerr(**kwargs)
+
+ def __nonzero__(self):
+ return not not self.errs
+
+ __bool__ = __nonzero__
diff --git a/powerline/lint/markedjson/events.py b/powerline/lint/markedjson/events.py
new file mode 100644
index 0000000..ef8a70e
--- /dev/null
+++ b/powerline/lint/markedjson/events.py
@@ -0,0 +1,97 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+
+# Abstract classes.
+class Event(object):
+ def __init__(self, start_mark=None, end_mark=None):
+ self.start_mark = start_mark
+ self.end_mark = end_mark
+
+ def __repr__(self):
+ attributes = [
+ key for key in ['implicit', 'value']
+ if hasattr(self, key)
+ ]
+ arguments = ', '.join([
+ '%s=%r' % (key, getattr(self, key))
+ for key in attributes
+ ])
+ return '%s(%s)' % (self.__class__.__name__, arguments)
+
+
+class NodeEvent(Event):
+ def __init__(self, start_mark=None, end_mark=None):
+ self.start_mark = start_mark
+ self.end_mark = end_mark
+
+
+class CollectionStartEvent(NodeEvent):
+ def __init__(self, implicit, start_mark=None, end_mark=None, flow_style=None):
+ self.tag = None
+ self.implicit = implicit
+ self.start_mark = start_mark
+ self.end_mark = end_mark
+ self.flow_style = flow_style
+
+
+class CollectionEndEvent(Event):
+ pass
+
+
+# Implementations.
+class StreamStartEvent(Event):
+ def __init__(self, start_mark=None, end_mark=None, encoding=None):
+ self.start_mark = start_mark
+ self.end_mark = end_mark
+ self.encoding = encoding
+
+
+class StreamEndEvent(Event):
+ pass
+
+
+class DocumentStartEvent(Event):
+ def __init__(self, start_mark=None, end_mark=None, explicit=None, version=None, tags=None):
+ self.start_mark = start_mark
+ self.end_mark = end_mark
+ self.explicit = explicit
+ self.version = version
+ self.tags = tags
+
+
+class DocumentEndEvent(Event):
+ def __init__(self, start_mark=None, end_mark=None, explicit=None):
+ self.start_mark = start_mark
+ self.end_mark = end_mark
+ self.explicit = explicit
+
+
+class AliasEvent(NodeEvent):
+ pass
+
+
+class ScalarEvent(NodeEvent):
+ def __init__(self, implicit, value, start_mark=None, end_mark=None, style=None):
+ self.tag = None
+ self.implicit = implicit
+ self.value = value
+ self.start_mark = start_mark
+ self.end_mark = end_mark
+ self.style = style
+
+
+class SequenceStartEvent(CollectionStartEvent):
+ pass
+
+
+class SequenceEndEvent(CollectionEndEvent):
+ pass
+
+
+class MappingStartEvent(CollectionStartEvent):
+ pass
+
+
+class MappingEndEvent(CollectionEndEvent):
+ pass
diff --git a/powerline/lint/markedjson/loader.py b/powerline/lint/markedjson/loader.py
new file mode 100644
index 0000000..3ee5686
--- /dev/null
+++ b/powerline/lint/markedjson/loader.py
@@ -0,0 +1,25 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+from powerline.lint.markedjson.reader import Reader
+from powerline.lint.markedjson.scanner import Scanner
+from powerline.lint.markedjson.parser import Parser
+from powerline.lint.markedjson.composer import Composer
+from powerline.lint.markedjson.constructor import Constructor
+from powerline.lint.markedjson.resolver import Resolver
+from powerline.lint.markedjson.error import echoerr
+
+
+class Loader(Reader, Scanner, Parser, Composer, Constructor, Resolver):
+ def __init__(self, stream):
+ Reader.__init__(self, stream)
+ Scanner.__init__(self)
+ Parser.__init__(self)
+ Composer.__init__(self)
+ Constructor.__init__(self)
+ Resolver.__init__(self)
+ self.haserrors = False
+
+ def echoerr(self, *args, **kwargs):
+ echoerr(*args, **kwargs)
+ self.haserrors = True
diff --git a/powerline/lint/markedjson/markedvalue.py b/powerline/lint/markedjson/markedvalue.py
new file mode 100644
index 0000000..3b8db3e
--- /dev/null
+++ b/powerline/lint/markedjson/markedvalue.py
@@ -0,0 +1,151 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+from powerline.lib.unicode import unicode
+
+
+def gen_new(cls):
+ def __new__(arg_cls, value, mark):
+ r = super(arg_cls, arg_cls).__new__(arg_cls, value)
+ r.mark = mark
+ r.value = value
+ return r
+ return __new__
+
+
+def gen_init(cls):
+ def __init__(self, value, mark):
+ return cls.__init__(self, value)
+ return __init__
+
+
+def gen_getnewargs(cls):
+ def __getnewargs__(self):
+ return (self.value, self.mark)
+ return __getnewargs__
+
+
+class MarkedUnicode(unicode):
+ __new__ = gen_new(unicode)
+ __getnewargs__ = gen_getnewargs(unicode)
+
+ def _proc_partition(self, part_result):
+ pointdiff = 1
+ r = []
+ for s in part_result:
+ r.append(MarkedUnicode(s, self.mark.advance_string(pointdiff)))
+ pointdiff += len(s)
+ return tuple(r)
+
+ def rpartition(self, sep):
+ return self._proc_partition(super(MarkedUnicode, self).rpartition(sep))
+
+ def partition(self, sep):
+ return self._proc_partition(super(MarkedUnicode, self).partition(sep))
+
+
+class MarkedInt(int):
+ __new__ = gen_new(int)
+ __getnewargs__ = gen_getnewargs(int)
+
+
+class MarkedFloat(float):
+ __new__ = gen_new(float)
+ __getnewargs__ = gen_getnewargs(float)
+
+
+class MarkedDict(dict):
+ __init__ = gen_init(dict)
+ __getnewargs__ = gen_getnewargs(dict)
+
+ def __new__(arg_cls, value, mark):
+ r = super(arg_cls, arg_cls).__new__(arg_cls, value)
+ r.mark = mark
+ r.value = value
+ r.keydict = dict(((key, key) for key in r))
+ return r
+
+ def setmerged(self, d):
+ try:
+ self.mark.set_merged_mark(d.mark)
+ except AttributeError:
+ pass
+
+ def __setitem__(self, key, value):
+ try:
+ old_value = self[key]
+ except KeyError:
+ pass
+ else:
+ try:
+ key.mark.set_old_mark(self.keydict[key].mark)
+ except AttributeError:
+ pass
+ except KeyError:
+ pass
+ try:
+ value.mark.set_old_mark(old_value.mark)
+ except AttributeError:
+ pass
+ dict.__setitem__(self, key, value)
+ self.keydict[key] = key
+
+ def update(self, *args, **kwargs):
+ dict.update(self, *args, **kwargs)
+ self.keydict = dict(((key, key) for key in self))
+
+ def copy(self):
+ return MarkedDict(super(MarkedDict, self).copy(), self.mark)
+
+
+class MarkedList(list):
+ __new__ = gen_new(list)
+ __init__ = gen_init(list)
+ __getnewargs__ = gen_getnewargs(list)
+
+
+class MarkedValue:
+ def __init__(self, value, mark):
+ self.mark = mark
+ self.value = value
+
+ __getinitargs__ = gen_getnewargs(None)
+
+
+specialclasses = {
+ unicode: MarkedUnicode,
+ int: MarkedInt,
+ float: MarkedFloat,
+ dict: MarkedDict,
+ list: MarkedList,
+}
+
+classcache = {}
+
+
+def gen_marked_value(value, mark, use_special_classes=True):
+ if use_special_classes and value.__class__ in specialclasses:
+ Marked = specialclasses[value.__class__]
+ elif value.__class__ in classcache:
+ Marked = classcache[value.__class__]
+ else:
+ class Marked(MarkedValue):
+ for func in value.__class__.__dict__:
+ if func == 'copy':
+ def copy(self):
+ return self.__class__(self.value.copy(), self.mark)
+ elif func not in set(('__init__', '__new__', '__getattribute__')):
+ if func in set(('__eq__',)):
+ # HACK to make marked dictionaries always work
+ exec ((
+ 'def {0}(self, *args):\n'
+ ' return self.value.{0}(*[arg.value if isinstance(arg, MarkedValue) else arg for arg in args])'
+ ).format(func))
+ else:
+ exec ((
+ 'def {0}(self, *args, **kwargs):\n'
+ ' return self.value.{0}(*args, **kwargs)\n'
+ ).format(func))
+ classcache[value.__class__] = Marked
+
+ return Marked(value, mark)
diff --git a/powerline/lint/markedjson/nodes.py b/powerline/lint/markedjson/nodes.py
new file mode 100644
index 0000000..66ad843
--- /dev/null
+++ b/powerline/lint/markedjson/nodes.py
@@ -0,0 +1,55 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+
+class Node(object):
+ def __init__(self, tag, value, start_mark, end_mark):
+ self.tag = tag
+ self.value = value
+ self.start_mark = start_mark
+ self.end_mark = end_mark
+
+ def __repr__(self):
+ value = self.value
+ # if isinstance(value, list):
+ # if len(value) == 0:
+ # value = '<empty>'
+ # elif len(value) == 1:
+ # value = '<1 item>'
+ # else:
+ # value = '<%d items>' % len(value)
+ # else:
+ # if len(value) > 75:
+ # value = repr(value[:70]+u' ... ')
+ # else:
+ # value = repr(value)
+ value = repr(value)
+ return '%s(tag=%r, value=%s)' % (self.__class__.__name__, self.tag, value)
+
+
+class ScalarNode(Node):
+ id = 'scalar'
+
+ def __init__(self, tag, value, start_mark=None, end_mark=None, style=None):
+ self.tag = tag
+ self.value = value
+ self.start_mark = start_mark
+ self.end_mark = end_mark
+ self.style = style
+
+
+class CollectionNode(Node):
+ def __init__(self, tag, value, start_mark=None, end_mark=None, flow_style=None):
+ self.tag = tag
+ self.value = value
+ self.start_mark = start_mark
+ self.end_mark = end_mark
+ self.flow_style = flow_style
+
+
+class SequenceNode(CollectionNode):
+ id = 'sequence'
+
+
+class MappingNode(CollectionNode):
+ id = 'mapping'
diff --git a/powerline/lint/markedjson/parser.py b/powerline/lint/markedjson/parser.py
new file mode 100644
index 0000000..336a2a2
--- /dev/null
+++ b/powerline/lint/markedjson/parser.py
@@ -0,0 +1,255 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+from powerline.lint.markedjson.error import MarkedError
+from powerline.lint.markedjson import tokens
+from powerline.lint.markedjson import events
+
+
+class ParserError(MarkedError):
+ pass
+
+
+class Parser:
+ def __init__(self):
+ self.current_event = None
+ self.yaml_version = None
+ self.states = []
+ self.marks = []
+ self.state = self.parse_stream_start
+
+ def dispose(self):
+ # Reset the state attributes (to clear self-references)
+ self.states = []
+ self.state = None
+
+ def check_event(self, *choices):
+ # Check the type of the next event.
+ if self.current_event is None:
+ if self.state:
+ self.current_event = self.state()
+ if self.current_event is not None:
+ if not choices:
+ return True
+ for choice in choices:
+ if isinstance(self.current_event, choice):
+ return True
+ return False
+
+ def peek_event(self):
+ # Get the next event.
+ if self.current_event is None:
+ if self.state:
+ self.current_event = self.state()
+ return self.current_event
+
+ def get_event(self):
+ # Get the next event and proceed further.
+ if self.current_event is None:
+ if self.state:
+ self.current_event = self.state()
+ value = self.current_event
+ self.current_event = None
+ return value
+
+ # stream ::= STREAM-START implicit_document? explicit_document* STREAM-END
+ # implicit_document ::= block_node DOCUMENT-END*
+ # explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END*
+
+ def parse_stream_start(self):
+ # Parse the stream start.
+ token = self.get_token()
+ event = events.StreamStartEvent(token.start_mark, token.end_mark, encoding=token.encoding)
+
+ # Prepare the next state.
+ self.state = self.parse_implicit_document_start
+
+ return event
+
+ def parse_implicit_document_start(self):
+ # Parse an implicit document.
+ if not self.check_token(tokens.StreamEndToken):
+ token = self.peek_token()
+ start_mark = end_mark = token.start_mark
+ event = events.DocumentStartEvent(start_mark, end_mark, explicit=False)
+
+ # Prepare the next state.
+ self.states.append(self.parse_document_end)
+ self.state = self.parse_node
+
+ return event
+
+ else:
+ return self.parse_document_start()
+
+ def parse_document_start(self):
+ # Parse an explicit document.
+ if not self.check_token(tokens.StreamEndToken):
+ token = self.peek_token()
+ self.echoerr(
+ None, None,
+ ('expected \'<stream end>\', but found %r' % token.id), token.start_mark
+ )
+ return events.StreamEndEvent(token.start_mark, token.end_mark)
+ else:
+ # Parse the end of the stream.
+ token = self.get_token()
+ event = events.StreamEndEvent(token.start_mark, token.end_mark)
+ assert not self.states
+ assert not self.marks
+ self.state = None
+ return event
+
+ def parse_document_end(self):
+ # Parse the document end.
+ token = self.peek_token()
+ start_mark = end_mark = token.start_mark
+ explicit = False
+ event = events.DocumentEndEvent(start_mark, end_mark, explicit=explicit)
+
+ # Prepare the next state.
+ self.state = self.parse_document_start
+
+ return event
+
+ def parse_document_content(self):
+ return self.parse_node()
+
+ def parse_node(self, indentless_sequence=False):
+ start_mark = end_mark = None
+ if start_mark is None:
+ start_mark = end_mark = self.peek_token().start_mark
+ event = None
+ implicit = True
+ if self.check_token(tokens.ScalarToken):
+ token = self.get_token()
+ end_mark = token.end_mark
+ if token.plain:
+ implicit = (True, False)
+ else:
+ implicit = (False, True)
+ event = events.ScalarEvent(implicit, token.value, start_mark, end_mark, style=token.style)
+ self.state = self.states.pop()
+ elif self.check_token(tokens.FlowSequenceStartToken):
+ end_mark = self.peek_token().end_mark
+ event = events.SequenceStartEvent(implicit, start_mark, end_mark, flow_style=True)
+ self.state = self.parse_flow_sequence_first_entry
+ elif self.check_token(tokens.FlowMappingStartToken):
+ end_mark = self.peek_token().end_mark
+ event = events.MappingStartEvent(implicit, start_mark, end_mark, flow_style=True)
+ self.state = self.parse_flow_mapping_first_key
+ else:
+ token = self.peek_token()
+ raise ParserError(
+ 'while parsing a flow node', start_mark,
+ 'expected the node content, but found %r' % token.id,
+ token.start_mark
+ )
+ return event
+
+ def parse_flow_sequence_first_entry(self):
+ token = self.get_token()
+ self.marks.append(token.start_mark)
+ return self.parse_flow_sequence_entry(first=True)
+
+ def parse_flow_sequence_entry(self, first=False):
+ if not self.check_token(tokens.FlowSequenceEndToken):
+ if not first:
+ if self.check_token(tokens.FlowEntryToken):
+ self.get_token()
+ if self.check_token(tokens.FlowSequenceEndToken):
+ token = self.peek_token()
+ self.echoerr(
+ 'While parsing a flow sequence', self.marks[-1],
+ ('expected sequence value, but got %r' % token.id), token.start_mark
+ )
+ else:
+ token = self.peek_token()
+ raise ParserError(
+ 'while parsing a flow sequence', self.marks[-1],
+ ('expected \',\' or \']\', but got %r' % token.id), token.start_mark
+ )
+
+ if not self.check_token(tokens.FlowSequenceEndToken):
+ self.states.append(self.parse_flow_sequence_entry)
+ return self.parse_node()
+ token = self.get_token()
+ event = events.SequenceEndEvent(token.start_mark, token.end_mark)
+ self.state = self.states.pop()
+ self.marks.pop()
+ return event
+
+ def parse_flow_sequence_entry_mapping_end(self):
+ self.state = self.parse_flow_sequence_entry
+ token = self.peek_token()
+ return events.MappingEndEvent(token.start_mark, token.start_mark)
+
+ def parse_flow_mapping_first_key(self):
+ token = self.get_token()
+ self.marks.append(token.start_mark)
+ return self.parse_flow_mapping_key(first=True)
+
+ def parse_flow_mapping_key(self, first=False):
+ if not self.check_token(tokens.FlowMappingEndToken):
+ if not first:
+ if self.check_token(tokens.FlowEntryToken):
+ self.get_token()
+ if self.check_token(tokens.FlowMappingEndToken):
+ token = self.peek_token()
+ self.echoerr(
+ 'While parsing a flow mapping', self.marks[-1],
+ ('expected mapping key, but got %r' % token.id), token.start_mark
+ )
+ else:
+ token = self.peek_token()
+ raise ParserError(
+ 'while parsing a flow mapping', self.marks[-1],
+ ('expected \',\' or \'}\', but got %r' % token.id), token.start_mark
+ )
+ if self.check_token(tokens.KeyToken):
+ token = self.get_token()
+ if not self.check_token(tokens.ValueToken, tokens.FlowEntryToken, tokens.FlowMappingEndToken):
+ self.states.append(self.parse_flow_mapping_value)
+ return self.parse_node()
+ else:
+ token = self.peek_token()
+ raise ParserError(
+ 'while parsing a flow mapping', self.marks[-1],
+ ('expected value, but got %r' % token.id), token.start_mark
+ )
+ elif not self.check_token(tokens.FlowMappingEndToken):
+ token = self.peek_token()
+ expect_key = self.check_token(tokens.ValueToken, tokens.FlowEntryToken)
+ if not expect_key:
+ self.get_token()
+ expect_key = self.check_token(tokens.ValueToken)
+
+ if expect_key:
+ raise ParserError(
+ 'while parsing a flow mapping', self.marks[-1],
+ ('expected string key, but got %r' % token.id), token.start_mark
+ )
+ else:
+ token = self.peek_token()
+ raise ParserError(
+ 'while parsing a flow mapping', self.marks[-1],
+ ('expected \':\', but got %r' % token.id), token.start_mark
+ )
+ token = self.get_token()
+ event = events.MappingEndEvent(token.start_mark, token.end_mark)
+ self.state = self.states.pop()
+ self.marks.pop()
+ return event
+
+ def parse_flow_mapping_value(self):
+ if self.check_token(tokens.ValueToken):
+ token = self.get_token()
+ if not self.check_token(tokens.FlowEntryToken, tokens.FlowMappingEndToken):
+ self.states.append(self.parse_flow_mapping_key)
+ return self.parse_node()
+
+ token = self.peek_token()
+ raise ParserError(
+ 'while parsing a flow mapping', self.marks[-1],
+ ('expected mapping value, but got %r' % token.id), token.start_mark
+ )
diff --git a/powerline/lint/markedjson/reader.py b/powerline/lint/markedjson/reader.py
new file mode 100644
index 0000000..0ca4516
--- /dev/null
+++ b/powerline/lint/markedjson/reader.py
@@ -0,0 +1,141 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import codecs
+
+from powerline.lint.markedjson.error import MarkedError, Mark, NON_PRINTABLE_RE
+from powerline.lib.unicode import unicode
+
+
+# This module contains abstractions for the input stream. You don’t have to
+# looks further, there are no pretty code.
+
+
+class ReaderError(MarkedError):
+ pass
+
+
+class Reader(object):
+ # Reader:
+ # - determines the data encoding and converts it to a unicode string,
+ # - checks if characters are in allowed range,
+ # - adds '\0' to the end.
+
+ # Reader accepts
+ # - a file-like object with its `read` method returning `str`,
+
+ # Yeah, it’s ugly and slow.
+ def __init__(self, stream):
+ self.name = None
+ self.stream = None
+ self.stream_pointer = 0
+ self.eof = True
+ self.buffer = ''
+ self.pointer = 0
+ self.full_buffer = unicode('')
+ self.full_pointer = 0
+ self.raw_buffer = None
+ self.raw_decode = codecs.utf_8_decode
+ self.encoding = 'utf-8'
+ self.index = 0
+ self.line = 0
+ self.column = 0
+
+ self.stream = stream
+ self.name = getattr(stream, 'name', '<file>')
+ self.eof = False
+ self.raw_buffer = None
+
+ while not self.eof and (self.raw_buffer is None or len(self.raw_buffer) < 2):
+ self.update_raw()
+ self.update(1)
+
+ def peek(self, index=0):
+ try:
+ return self.buffer[self.pointer + index]
+ except IndexError:
+ self.update(index + 1)
+ return self.buffer[self.pointer + index]
+
+ def prefix(self, length=1):
+ if self.pointer + length >= len(self.buffer):
+ self.update(length)
+ return self.buffer[self.pointer:self.pointer + length]
+
+ def update_pointer(self, length):
+ while length:
+ ch = self.buffer[self.pointer]
+ self.pointer += 1
+ self.full_pointer += 1
+ self.index += 1
+ if ch == '\n':
+ self.line += 1
+ self.column = 0
+ else:
+ self.column += 1
+ length -= 1
+
+ def forward(self, length=1):
+ if self.pointer + length + 1 >= len(self.buffer):
+ self.update(length + 1)
+ self.update_pointer(length)
+
+ def get_mark(self):
+ return Mark(self.name, self.line, self.column, self.full_buffer, self.full_pointer)
+
+ def check_printable(self, data):
+ match = NON_PRINTABLE_RE.search(data)
+ if match:
+ self.update_pointer(match.start())
+ raise ReaderError(
+ 'while reading from stream', None,
+ 'found special characters which are not allowed',
+ Mark(self.name, self.line, self.column, self.full_buffer, self.full_pointer)
+ )
+
+ def update(self, length):
+ if self.raw_buffer is None:
+ return
+ self.buffer = self.buffer[self.pointer:]
+ self.pointer = 0
+ while len(self.buffer) < length:
+ if not self.eof:
+ self.update_raw()
+ try:
+ data, converted = self.raw_decode(self.raw_buffer, 'strict', self.eof)
+ except UnicodeDecodeError as exc:
+ character = self.raw_buffer[exc.start]
+ position = self.stream_pointer - len(self.raw_buffer) + exc.start
+ data, converted = self.raw_decode(self.raw_buffer[:exc.start], 'strict', self.eof)
+ self.buffer += data
+ self.full_buffer += data + '<' + str(ord(character)) + '>'
+ self.raw_buffer = self.raw_buffer[converted:]
+ self.update_pointer(exc.start - 1)
+ raise ReaderError(
+ 'while reading from stream', None,
+ 'found character #x%04x that cannot be decoded by UTF-8 codec' % ord(character),
+ Mark(self.name, self.line, self.column, self.full_buffer, position)
+ )
+ self.buffer += data
+ self.full_buffer += data
+ self.raw_buffer = self.raw_buffer[converted:]
+ self.check_printable(data)
+ if self.eof:
+ self.buffer += '\0'
+ self.raw_buffer = None
+ break
+
+ def update_raw(self, size=-1):
+ # Was size=4096
+ assert(size < 0)
+ # WARNING: reading the whole stream at once. To change this behaviour to
+ # former reading N characters at once one must make sure that reading
+ # never ends at partial unicode character.
+ data = self.stream.read(size)
+ if self.raw_buffer is None:
+ self.raw_buffer = data
+ else:
+ self.raw_buffer += data
+ self.stream_pointer += len(data)
+ if not data:
+ self.eof = True
diff --git a/powerline/lint/markedjson/resolver.py b/powerline/lint/markedjson/resolver.py
new file mode 100644
index 0000000..fa8ceaa
--- /dev/null
+++ b/powerline/lint/markedjson/resolver.py
@@ -0,0 +1,131 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import re
+
+from powerline.lint.markedjson.error import MarkedError
+from powerline.lint.markedjson import nodes
+
+
+class ResolverError(MarkedError):
+ pass
+
+
+class BaseResolver:
+ DEFAULT_SCALAR_TAG = 'tag:yaml.org,2002:str'
+ DEFAULT_SEQUENCE_TAG = 'tag:yaml.org,2002:seq'
+ DEFAULT_MAPPING_TAG = 'tag:yaml.org,2002:map'
+
+ yaml_implicit_resolvers = {}
+ yaml_path_resolvers = {}
+
+ def __init__(self):
+ self.resolver_exact_paths = []
+ self.resolver_prefix_paths = []
+
+ @classmethod
+ def add_implicit_resolver(cls, tag, regexp, first):
+ if 'yaml_implicit_resolvers' not in cls.__dict__:
+ cls.yaml_implicit_resolvers = cls.yaml_implicit_resolvers.copy()
+ if first is None:
+ first = [None]
+ for ch in first:
+ cls.yaml_implicit_resolvers.setdefault(ch, []).append((tag, regexp))
+
+ def descend_resolver(self, current_node, current_index):
+ if not self.yaml_path_resolvers:
+ return
+ exact_paths = {}
+ prefix_paths = []
+ if current_node:
+ depth = len(self.resolver_prefix_paths)
+ for path, kind in self.resolver_prefix_paths[-1]:
+ if self.check_resolver_prefix(depth, path, kind, current_node, current_index):
+ if len(path) > depth:
+ prefix_paths.append((path, kind))
+ else:
+ exact_paths[kind] = self.yaml_path_resolvers[path, kind]
+ else:
+ for path, kind in self.yaml_path_resolvers:
+ if not path:
+ exact_paths[kind] = self.yaml_path_resolvers[path, kind]
+ else:
+ prefix_paths.append((path, kind))
+ self.resolver_exact_paths.append(exact_paths)
+ self.resolver_prefix_paths.append(prefix_paths)
+
+ def ascend_resolver(self):
+ if not self.yaml_path_resolvers:
+ return
+ self.resolver_exact_paths.pop()
+ self.resolver_prefix_paths.pop()
+
+ def check_resolver_prefix(self, depth, path, kind, current_node, current_index):
+ node_check, index_check = path[depth - 1]
+ if isinstance(node_check, str):
+ if current_node.tag != node_check:
+ return
+ elif node_check is not None:
+ if not isinstance(current_node, node_check):
+ return
+ if index_check is True and current_index is not None:
+ return
+ if ((index_check is False or index_check is None)
+ and current_index is None):
+ return
+ if isinstance(index_check, str):
+ if not (isinstance(current_index, nodes.ScalarNode) and index_check == current_index.value):
+ return
+ elif isinstance(index_check, int) and not isinstance(index_check, bool):
+ if index_check != current_index:
+ return
+ return True
+
+ def resolve(self, kind, value, implicit, mark=None):
+ if kind is nodes.ScalarNode and implicit[0]:
+ if value == '':
+ resolvers = self.yaml_implicit_resolvers.get('', [])
+ else:
+ resolvers = self.yaml_implicit_resolvers.get(value[0], [])
+ resolvers += self.yaml_implicit_resolvers.get(None, [])
+ for tag, regexp in resolvers:
+ if regexp.match(value):
+ return tag
+ else:
+ self.echoerr(
+ 'While resolving plain scalar', None,
+ 'expected floating-point value, integer, null or boolean, but got %r' % value,
+ mark
+ )
+ return self.DEFAULT_SCALAR_TAG
+ if kind is nodes.ScalarNode:
+ return self.DEFAULT_SCALAR_TAG
+ elif kind is nodes.SequenceNode:
+ return self.DEFAULT_SEQUENCE_TAG
+ elif kind is nodes.MappingNode:
+ return self.DEFAULT_MAPPING_TAG
+
+
+class Resolver(BaseResolver):
+ pass
+
+
+Resolver.add_implicit_resolver(
+ 'tag:yaml.org,2002:bool',
+ re.compile(r'''^(?:true|false)$''', re.X),
+ list('yYnNtTfFoO'))
+
+Resolver.add_implicit_resolver(
+ 'tag:yaml.org,2002:float',
+ re.compile(r'^-?(?:0|[1-9]\d*)(?=[.eE])(?:\.\d+)?(?:[eE][-+]?\d+)?$', re.X),
+ list('-0123456789'))
+
+Resolver.add_implicit_resolver(
+ 'tag:yaml.org,2002:int',
+ re.compile(r'^(?:0|-?[1-9]\d*)$', re.X),
+ list('-0123456789'))
+
+Resolver.add_implicit_resolver(
+ 'tag:yaml.org,2002:null',
+ re.compile(r'^null$', re.X),
+ ['n'])
diff --git a/powerline/lint/markedjson/scanner.py b/powerline/lint/markedjson/scanner.py
new file mode 100644
index 0000000..b0bddf3
--- /dev/null
+++ b/powerline/lint/markedjson/scanner.py
@@ -0,0 +1,499 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+from string import hexdigits
+
+from powerline.lint.markedjson.error import MarkedError
+from powerline.lint.markedjson import tokens
+from powerline.lib.unicode import unicode, unichr, surrogate_pair_to_character
+
+
+hexdigits_set = set(hexdigits)
+
+
+# Scanner produces tokens of the following types:
+# STREAM-START
+# STREAM-END
+# DOCUMENT-START
+# DOCUMENT-END
+# FLOW-SEQUENCE-START
+# FLOW-MAPPING-START
+# FLOW-SEQUENCE-END
+# FLOW-MAPPING-END
+# FLOW-ENTRY
+# KEY
+# VALUE
+# SCALAR(value, plain, style)
+#
+# Read comments in the Scanner code for more details.
+
+
+class ScannerError(MarkedError):
+ pass
+
+
+class SimpleKey:
+ # See below simple keys treatment.
+ def __init__(self, token_number, index, line, column, mark):
+ self.token_number = token_number
+ self.index = index
+ self.line = line
+ self.column = column
+ self.mark = mark
+
+
+class Scanner:
+ def __init__(self):
+ '''Initialize the scanner.'''
+ # It is assumed that Scanner and Reader will have a common descendant.
+ # Reader do the dirty work of checking for BOM and converting the
+ # input data to Unicode. It also adds NUL to the end.
+ #
+ # Reader supports the following methods
+ # self.peek(i=0) # peek the next i-th character
+ # self.prefix(l=1) # peek the next l characters
+ # self.forward(l=1) # read the next l characters and move the pointer.
+
+ # Had we reached the end of the stream?
+ self.done = False
+
+ # The number of unclosed '{' and '['. `flow_level == 0` means block
+ # context.
+ self.flow_level = 0
+
+ # List of processed tokens that are not yet emitted.
+ self.tokens = []
+
+ # Add the STREAM-START token.
+ self.fetch_stream_start()
+
+ # Number of tokens that were emitted through the `get_token` method.
+ self.tokens_taken = 0
+
+ # Variables related to simple keys treatment.
+
+ # A simple key is a key that is not denoted by the '?' indicator.
+ # We emit the KEY token before all keys, so when we find a potential
+ # simple key, we try to locate the corresponding ':' indicator.
+ # Simple keys should be limited to a single line.
+
+ # Can a simple key start at the current position? A simple key may
+ # start:
+ # - after '{', '[', ',' (in the flow context),
+ self.allow_simple_key = False
+
+ # Keep track of possible simple keys. This is a dictionary. The key
+ # is `flow_level`; there can be no more that one possible simple key
+ # for each level. The value is a SimpleKey record:
+ # (token_number, index, line, column, mark)
+ # A simple key may start with SCALAR(flow), '[', or '{' tokens.
+ self.possible_simple_keys = {}
+
+ # Public methods.
+
+ def check_token(self, *choices):
+ # Check if the next token is one of the given types.
+ while self.need_more_tokens():
+ self.fetch_more_tokens()
+ if self.tokens:
+ if not choices:
+ return True
+ for choice in choices:
+ if isinstance(self.tokens[0], choice):
+ return True
+ return False
+
+ def peek_token(self):
+ # Return the next token, but do not delete if from the queue.
+ while self.need_more_tokens():
+ self.fetch_more_tokens()
+ if self.tokens:
+ return self.tokens[0]
+
+ def get_token(self):
+ # Return the next token.
+ while self.need_more_tokens():
+ self.fetch_more_tokens()
+ if self.tokens:
+ self.tokens_taken += 1
+ return self.tokens.pop(0)
+
+ # Private methods.
+
+ def need_more_tokens(self):
+ if self.done:
+ return False
+ if not self.tokens:
+ return True
+ # The current token may be a potential simple key, so we
+ # need to look further.
+ self.stale_possible_simple_keys()
+ if self.next_possible_simple_key() == self.tokens_taken:
+ return True
+
+ def fetch_more_tokens(self):
+
+ # Eat whitespaces and comments until we reach the next token.
+ self.scan_to_next_token()
+
+ # Remove obsolete possible simple keys.
+ self.stale_possible_simple_keys()
+
+ # Peek the next character.
+ ch = self.peek()
+
+ # Is it the end of stream?
+ if ch == '\0':
+ return self.fetch_stream_end()
+
+ # Note: the order of the following checks is NOT significant.
+
+ # Is it the flow sequence start indicator?
+ if ch == '[':
+ return self.fetch_flow_sequence_start()
+
+ # Is it the flow mapping start indicator?
+ if ch == '{':
+ return self.fetch_flow_mapping_start()
+
+ # Is it the flow sequence end indicator?
+ if ch == ']':
+ return self.fetch_flow_sequence_end()
+
+ # Is it the flow mapping end indicator?
+ if ch == '}':
+ return self.fetch_flow_mapping_end()
+
+ # Is it the flow entry indicator?
+ if ch == ',':
+ return self.fetch_flow_entry()
+
+ # Is it the value indicator?
+ if ch == ':' and self.flow_level:
+ return self.fetch_value()
+
+ # Is it a double quoted scalar?
+ if ch == '"':
+ return self.fetch_double()
+
+ # It must be a plain scalar then.
+ if self.check_plain():
+ return self.fetch_plain()
+
+ # No? It’s an error. Let’s produce a nice error message.
+ raise ScannerError(
+ 'while scanning for the next token', None,
+ 'found character %r that cannot start any token' % ch,
+ self.get_mark()
+ )
+
+ # Simple keys treatment.
+
+ def next_possible_simple_key(self):
+ # Return the number of the nearest possible simple key. Actually we
+ # don’t need to loop through the whole dictionary. We may replace it
+ # with the following code:
+ # if not self.possible_simple_keys:
+ # return None
+ # return self.possible_simple_keys[
+ # min(self.possible_simple_keys.keys())].token_number
+ min_token_number = None
+ for level in self.possible_simple_keys:
+ key = self.possible_simple_keys[level]
+ if min_token_number is None or key.token_number < min_token_number:
+ min_token_number = key.token_number
+ return min_token_number
+
+ def stale_possible_simple_keys(self):
+ # Remove entries that are no longer possible simple keys. According to
+ # the YAML specification, simple keys
+ # - should be limited to a single line,
+ # Disabling this procedure will allow simple keys of any length and
+ # height (may cause problems if indentation is broken though).
+ for level in list(self.possible_simple_keys):
+ key = self.possible_simple_keys[level]
+ if key.line != self.line:
+ del self.possible_simple_keys[level]
+
+ def save_possible_simple_key(self):
+ # The next token may start a simple key. We check if it’s possible
+ # and save its position. This function is called for
+ # SCALAR(flow), '[', and '{'.
+
+ # The next token might be a simple key. Let’s save it’s number and
+ # position.
+ if self.allow_simple_key:
+ self.remove_possible_simple_key()
+ token_number = self.tokens_taken + len(self.tokens)
+ key = SimpleKey(token_number, self.index, self.line, self.column, self.get_mark())
+ self.possible_simple_keys[self.flow_level] = key
+
+ def remove_possible_simple_key(self):
+ # Remove the saved possible key position at the current flow level.
+ if self.flow_level in self.possible_simple_keys:
+ del self.possible_simple_keys[self.flow_level]
+
+ # Fetchers.
+
+ def fetch_stream_start(self):
+ # We always add STREAM-START as the first token and STREAM-END as the
+ # last token.
+
+ # Read the token.
+ mark = self.get_mark()
+
+ # Add STREAM-START.
+ self.tokens.append(tokens.StreamStartToken(mark, mark, encoding=self.encoding))
+
+ def fetch_stream_end(self):
+ # Reset simple keys.
+ self.remove_possible_simple_key()
+ self.allow_simple_key = False
+ self.possible_simple_keys = {}
+
+ # Read the token.
+ mark = self.get_mark()
+
+ # Add STREAM-END.
+ self.tokens.append(tokens.StreamEndToken(mark, mark))
+
+ # The steam is finished.
+ self.done = True
+
+ def fetch_flow_sequence_start(self):
+ self.fetch_flow_collection_start(tokens.FlowSequenceStartToken)
+
+ def fetch_flow_mapping_start(self):
+ self.fetch_flow_collection_start(tokens.FlowMappingStartToken)
+
+ def fetch_flow_collection_start(self, TokenClass):
+ # '[' and '{' may start a simple key.
+ self.save_possible_simple_key()
+
+ # Increase the flow level.
+ self.flow_level += 1
+
+ # Simple keys are allowed after '[' and '{'.
+ self.allow_simple_key = True
+
+ # Add FLOW-SEQUENCE-START or FLOW-MAPPING-START.
+ start_mark = self.get_mark()
+ self.forward()
+ end_mark = self.get_mark()
+ self.tokens.append(TokenClass(start_mark, end_mark))
+
+ def fetch_flow_sequence_end(self):
+ self.fetch_flow_collection_end(tokens.FlowSequenceEndToken)
+
+ def fetch_flow_mapping_end(self):
+ self.fetch_flow_collection_end(tokens.FlowMappingEndToken)
+
+ def fetch_flow_collection_end(self, TokenClass):
+ # Reset possible simple key on the current level.
+ self.remove_possible_simple_key()
+
+ # Decrease the flow level.
+ self.flow_level -= 1
+
+ # No simple keys after ']' or '}'.
+ self.allow_simple_key = False
+
+ # Add FLOW-SEQUENCE-END or FLOW-MAPPING-END.
+ start_mark = self.get_mark()
+ self.forward()
+ end_mark = self.get_mark()
+ self.tokens.append(TokenClass(start_mark, end_mark))
+
+ def fetch_value(self):
+ # Do we determine a simple key?
+ if self.flow_level in self.possible_simple_keys:
+
+ # Add KEY.
+ key = self.possible_simple_keys[self.flow_level]
+ del self.possible_simple_keys[self.flow_level]
+ self.tokens.insert(key.token_number - self.tokens_taken, tokens.KeyToken(key.mark, key.mark))
+
+ # There cannot be two simple keys one after another.
+ self.allow_simple_key = False
+
+ # Add VALUE.
+ start_mark = self.get_mark()
+ self.forward()
+ end_mark = self.get_mark()
+ self.tokens.append(tokens.ValueToken(start_mark, end_mark))
+
+ def fetch_flow_entry(self):
+ # Simple keys are allowed after ','.
+ self.allow_simple_key = True
+
+ # Reset possible simple key on the current level.
+ self.remove_possible_simple_key()
+
+ # Add FLOW-ENTRY.
+ start_mark = self.get_mark()
+ self.forward()
+ end_mark = self.get_mark()
+ self.tokens.append(tokens.FlowEntryToken(start_mark, end_mark))
+
+ def fetch_double(self):
+ # A flow scalar could be a simple key.
+ self.save_possible_simple_key()
+
+ # No simple keys after flow scalars.
+ self.allow_simple_key = False
+
+ # Scan and add SCALAR.
+ self.tokens.append(self.scan_flow_scalar())
+
+ def fetch_plain(self):
+
+ self.save_possible_simple_key()
+
+ # No simple keys after plain scalars.
+ self.allow_simple_key = False
+
+ # Scan and add SCALAR. May change `allow_simple_key`.
+ self.tokens.append(self.scan_plain())
+
+ # Checkers.
+
+ def check_plain(self):
+ return self.peek() in '0123456789-ntf'
+
+ # Scanners.
+
+ def scan_to_next_token(self):
+ while self.peek() in ' \t\n':
+ self.forward()
+
+ def scan_flow_scalar(self):
+ # See the specification for details.
+ # Note that we loose indentation rules for quoted scalars. Quoted
+ # scalars don’t need to adhere indentation because " and ' clearly
+ # mark the beginning and the end of them. Therefore we are less
+ # restrictive then the specification requires. We only need to check
+ # that document separators are not included in scalars.
+ chunks = []
+ start_mark = self.get_mark()
+ quote = self.peek()
+ self.forward()
+ chunks.extend(self.scan_flow_scalar_non_spaces(start_mark))
+ while self.peek() != quote:
+ chunks.extend(self.scan_flow_scalar_spaces(start_mark))
+ chunks.extend(self.scan_flow_scalar_non_spaces(start_mark))
+ self.forward()
+ end_mark = self.get_mark()
+ return tokens.ScalarToken(unicode().join(chunks), False, start_mark, end_mark, '"')
+
+ ESCAPE_REPLACEMENTS = {
+ 'b': '\x08',
+ 't': '\x09',
+ 'n': '\x0A',
+ 'f': '\x0C',
+ 'r': '\x0D',
+ '"': '\"',
+ '\\': '\\',
+ }
+
+ ESCAPE_CODES = {
+ 'u': 4,
+ }
+
+ def scan_flow_scalar_non_spaces(self, start_mark):
+ # See the specification for details.
+ chunks = []
+ while True:
+ length = 0
+ while self.peek(length) not in '\"\\\0 \t\n':
+ length += 1
+ if length:
+ chunks.append(self.prefix(length))
+ self.forward(length)
+ ch = self.peek()
+ if ch == '\\':
+ self.forward()
+ ch = self.peek()
+ if ch in self.ESCAPE_REPLACEMENTS:
+ chunks.append(self.ESCAPE_REPLACEMENTS[ch])
+ self.forward()
+ elif ch in self.ESCAPE_CODES:
+ length = self.ESCAPE_CODES[ch]
+ self.forward()
+ for k in range(length):
+ if self.peek(k) not in hexdigits:
+ raise ScannerError(
+ 'while scanning a double-quoted scalar', start_mark,
+ 'expected escape sequence of %d hexdecimal numbers, but found %r' % (
+ length, self.peek(k)),
+ self.get_mark()
+ )
+ code = int(self.prefix(length), 16)
+ self.forward(length)
+ if 0xD800 <= code <= 0xDC00:
+ # Start of the surrogate pair
+ next_char = self.prefix(6)
+ if (
+ next_char[0] != '\\'
+ or next_char[1] != 'u'
+ or not (set(next_char[2:]) < hexdigits_set)
+ or not (0xDC00 <= int(next_char[2:], 16) <= 0xDFFF)
+ ):
+ raise ScannerError(
+ 'while scanning a double-quoted scalar', start_mark,
+ 'expected escape sequence with the next character in surrogate pair, but found %r' % (
+ next_char
+ ),
+ self.get_mark()
+ )
+ code = surrogate_pair_to_character(code, int(next_char[2:], 16))
+ self.forward(6)
+ chunks.append(unichr(code))
+ else:
+ raise ScannerError(
+ 'while scanning a double-quoted scalar', start_mark,
+ ('found unknown escape character %r' % ch), self.get_mark()
+ )
+ else:
+ return chunks
+
+ def scan_flow_scalar_spaces(self, start_mark):
+ # See the specification for details.
+ chunks = []
+ length = 0
+ while self.peek(length) in ' \t':
+ length += 1
+ whitespaces = self.prefix(length)
+ self.forward(length)
+ ch = self.peek()
+ if ch == '\0':
+ raise ScannerError(
+ 'while scanning a quoted scalar', start_mark,
+ 'found unexpected end of stream', self.get_mark()
+ )
+ elif ch == '\n':
+ raise ScannerError(
+ 'while scanning a quoted scalar', start_mark,
+ 'found unexpected line end', self.get_mark()
+ )
+ else:
+ chunks.append(whitespaces)
+ return chunks
+
+ def scan_plain(self):
+ chunks = []
+ start_mark = self.get_mark()
+ spaces = []
+ while True:
+ length = 0
+ while True:
+ if self.peek(length) not in 'eE.0123456789nul-tr+fas':
+ break
+ length += 1
+ if length == 0:
+ break
+ self.allow_simple_key = False
+ chunks.extend(spaces)
+ chunks.append(self.prefix(length))
+ self.forward(length)
+ end_mark = self.get_mark()
+ return tokens.ScalarToken(''.join(chunks), True, start_mark, end_mark)
diff --git a/powerline/lint/markedjson/tokens.py b/powerline/lint/markedjson/tokens.py
new file mode 100644
index 0000000..6fa8bf1
--- /dev/null
+++ b/powerline/lint/markedjson/tokens.py
@@ -0,0 +1,72 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+
+class Token(object):
+ def __init__(self, start_mark, end_mark):
+ self.start_mark = start_mark
+ self.end_mark = end_mark
+
+ def __repr__(self):
+ attributes = [
+ key for key in self.__dict__
+ if not key.endswith('_mark')
+ ]
+ attributes.sort()
+ arguments = ', '.join([
+ '%s=%r' % (key, getattr(self, key))
+ for key in attributes
+ ])
+ return '%s(%s)' % (self.__class__.__name__, arguments)
+
+
+class StreamStartToken(Token):
+ id = '<stream start>'
+
+ def __init__(self, start_mark=None, end_mark=None, encoding=None):
+ self.start_mark = start_mark
+ self.end_mark = end_mark
+ self.encoding = encoding
+
+
+class StreamEndToken(Token):
+ id = '<stream end>'
+
+
+class FlowSequenceStartToken(Token):
+ id = '['
+
+
+class FlowMappingStartToken(Token):
+ id = '{'
+
+
+class FlowSequenceEndToken(Token):
+ id = ']'
+
+
+class FlowMappingEndToken(Token):
+ id = '}'
+
+
+class KeyToken(Token):
+ id = '?'
+
+
+class ValueToken(Token):
+ id = ':'
+
+
+class FlowEntryToken(Token):
+ id = ','
+
+
+class ScalarToken(Token):
+ id = '<scalar>'
+
+ def __init__(self, value, plain, start_mark, end_mark, style=None):
+ self.value = value
+ self.plain = plain
+ self.start_mark = start_mark
+ self.end_mark = end_mark
+ self.style = style
diff --git a/powerline/lint/selfcheck.py b/powerline/lint/selfcheck.py
new file mode 100644
index 0000000..06d1fbe
--- /dev/null
+++ b/powerline/lint/selfcheck.py
@@ -0,0 +1,16 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+from powerline.lib.unicode import unicode
+
+
+def havemarks(*args, **kwargs):
+ origin = kwargs.get('origin', '')
+ for i, v in enumerate(args):
+ if not hasattr(v, 'mark'):
+ raise AssertionError('Value #{0}/{1} ({2!r}) has no attribute `mark`'.format(origin, i, v))
+ if isinstance(v, dict):
+ for key, val in v.items():
+ havemarks(key, val, origin=(origin + '[' + unicode(i) + ']/' + unicode(key)))
+ elif isinstance(v, list):
+ havemarks(*v, origin=(origin + '[' + unicode(i) + ']'))
diff --git a/powerline/lint/spec.py b/powerline/lint/spec.py
new file mode 100644
index 0000000..6a54441
--- /dev/null
+++ b/powerline/lint/spec.py
@@ -0,0 +1,759 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import itertools
+import re
+
+from copy import copy
+
+from powerline.lib.unicode import unicode
+from powerline.lint.markedjson.error import echoerr, DelayedEchoErr, NON_PRINTABLE_STR
+from powerline.lint.selfcheck import havemarks
+
+
+NON_PRINTABLE_RE = re.compile(
+ NON_PRINTABLE_STR.translate({
+ ord('\t'): None,
+ ord('\n'): None,
+ 0x0085: None,
+ })
+)
+
+
+class Spec(object):
+ '''Class that describes some JSON value
+
+ In powerline it is only used to describe JSON values stored in powerline
+ configuration.
+
+ :param dict keys:
+ Dictionary that maps keys that may be present in the given JSON
+ dictionary to their descriptions. If this parameter is not empty it
+ implies that described value has dictionary type. Non-dictionary types
+ must be described using ``Spec()``: without arguments.
+
+ .. note::
+ Methods that create the specifications return ``self``, so calls to them
+ may be chained: ``Spec().type(unicode).re('^\w+$')``. This does not
+ apply to functions that *apply* specification like :py:meth`Spec.match`.
+
+ .. note::
+ Methods starting with ``check_`` return two values: first determines
+ whether caller should proceed on running other checks, second
+ determines whether there were any problems (i.e. whether error was
+ reported). One should not call these methods directly: there is
+ :py:meth:`Spec.match` method for checking values.
+
+ .. note::
+ In ``check_`` and ``match`` methods specifications are identified by
+ their indexes for the purpose of simplyfying :py:meth:`Spec.copy`
+ method.
+
+ Some common parameters:
+
+ ``data``:
+ Whatever data supplied by the first caller for checker functions. Is not
+ processed by :py:class:`Spec` methods in any fashion.
+ ``context``:
+ :py:class:`powerline.lint.context.Context` instance, describes context
+ of the value. :py:class:`Spec` methods only use its ``.key`` methods for
+ error messages.
+ ``echoerr``:
+ Callable that should be used to echo errors. Is supposed to take four
+ optional keyword arguments: ``problem``, ``problem_mark``, ``context``,
+ ``context_mark``.
+ ``value``:
+ Checked value.
+ '''
+
+ def __init__(self, **keys):
+ self.specs = []
+ self.keys = {}
+ self.checks = []
+ self.cmsg = ''
+ self.isoptional = False
+ self.uspecs = []
+ self.ufailmsg = lambda key: 'found unknown key: {0}'.format(key)
+ self.did_type = False
+ self.update(**keys)
+
+ def update(self, **keys):
+ '''Describe additional keys that may be present in given JSON value
+
+ If called with some keyword arguments implies that described value is
+ a dictionary. If called without keyword parameters it is no-op.
+
+ :return: self.
+ '''
+ for k, v in keys.items():
+ self.keys[k] = len(self.specs)
+ self.specs.append(v)
+ if self.keys and not self.did_type:
+ self.type(dict)
+ self.did_type = True
+ return self
+
+ def copy(self, copied=None):
+ '''Deep copy the spec
+
+ :param dict copied:
+ Internal dictionary used for storing already copied values. This
+ parameter should not be used.
+
+ :return: New :py:class:`Spec` object that is a deep copy of ``self``.
+ '''
+ copied = copied or {}
+ try:
+ return copied[id(self)]
+ except KeyError:
+ instance = self.__class__()
+ copied[id(self)] = instance
+ return self.__class__()._update(self.__dict__, copied)
+
+ def _update(self, d, copied):
+ '''Helper for the :py:meth:`Spec.copy` function
+
+ Populates new instance with values taken from the old one.
+
+ :param dict d:
+ ``__dict__`` of the old instance.
+ :param dict copied:
+ Storage for already copied values.
+ '''
+ self.__dict__.update(d)
+ self.keys = copy(self.keys)
+ self.checks = copy(self.checks)
+ self.uspecs = copy(self.uspecs)
+ self.specs = [spec.copy(copied) for spec in self.specs]
+ return self
+
+ def unknown_spec(self, keyfunc, spec):
+ '''Define specification for non-static keys
+
+ This method should be used if key names cannot be determined at runtime
+ or if a number of keys share identical spec (in order to not repeat it).
+ :py:meth:`Spec.match` method processes dictionary in the given order:
+
+ * First it tries to use specifications provided at the initialization or
+ by the :py:meth:`Spec.update` method.
+ * If no specification for given key was provided it processes
+ specifications from ``keyfunc`` argument in order they were supplied.
+ Once some key matches specification supplied second ``spec`` argument
+ is used to determine correctness of the value.
+
+ :param Spec keyfunc:
+ :py:class:`Spec` instance or a regular function that returns two
+ values (the same :py:meth:`Spec.match` returns). This argument is
+ used to match keys that were not provided at initialization or via
+ :py:meth:`Spec.update`.
+ :param Spec spec:
+ :py:class:`Spec` instance that will be used to check keys matched by
+ ``keyfunc``.
+
+ :return: self.
+ '''
+ if isinstance(keyfunc, Spec):
+ self.specs.append(keyfunc)
+ keyfunc = len(self.specs) - 1
+ self.specs.append(spec)
+ self.uspecs.append((keyfunc, len(self.specs) - 1))
+ return self
+
+ def unknown_msg(self, msgfunc):
+ '''Define message which will be used when unknown key was found
+
+ “Unknown” is a key that was not provided at the initialization and via
+ :py:meth:`Spec.update` and did not match any ``keyfunc`` provided via
+ :py:meth:`Spec.unknown_spec`.
+
+ :param msgfunc:
+ Function that takes that unknown key as an argument and returns the
+ message text. Text will appear at the top (start of the sentence).
+
+ :return: self.
+ '''
+ self.ufailmsg = msgfunc
+ return self
+
+ def context_message(self, msg):
+ '''Define message that describes context
+
+ :param str msg:
+ Message that describes context. Is written using the
+ :py:meth:`str.format` syntax and is expected to display keyword
+ parameter ``key``.
+
+ :return: self.
+ '''
+ self.cmsg = msg
+ for spec in self.specs:
+ if not spec.cmsg:
+ spec.context_message(msg)
+ return self
+
+ def check_type(self, value, context_mark, data, context, echoerr, types):
+ '''Check that given value matches given type(s)
+
+ :param tuple types:
+ List of accepted types. Since :py:class:`Spec` is supposed to
+ describe JSON values only ``dict``, ``list``, ``unicode``, ``bool``,
+ ``float`` and ``NoneType`` types make any sense.
+
+ :return: proceed, hadproblem.
+ '''
+ havemarks(value)
+ if type(value.value) not in types:
+ echoerr(
+ context=self.cmsg.format(key=context.key),
+ context_mark=context_mark,
+ problem='{0!r} must be a {1} instance, not {2}'.format(
+ value,
+ ', '.join((t.__name__ for t in types)),
+ type(value.value).__name__
+ ),
+ problem_mark=value.mark
+ )
+ return False, True
+ return True, False
+
+ def check_func(self, value, context_mark, data, context, echoerr, func, msg_func):
+ '''Check value using given function
+
+ :param function func:
+ Callable that should accept four positional parameters:
+
+ #. checked value,
+ #. ``data`` parameter with arbitrary data (supplied by top-level
+ caller),
+ #. current context and
+ #. function used for echoing errors.
+
+ This callable should return three values:
+
+ #. determines whether ``check_func`` caller should proceed
+ calling other checks,
+ #. determines whether ``check_func`` should echo error on its own
+ (it should be set to False if ``func`` echoes error itself) and
+ #. determines whether function has found some errors in the checked
+ value.
+
+ :param function msg_func:
+ Callable that takes checked value as the only positional parameter
+ and returns a string that describes the problem. Only useful for
+ small checker functions since it is ignored when second returned
+ value is false.
+
+ :return: proceed, hadproblem.
+ '''
+ havemarks(value)
+ proceed, echo, hadproblem = func(value, data, context, echoerr)
+ if echo and hadproblem:
+ echoerr(context=self.cmsg.format(key=context.key),
+ context_mark=context_mark,
+ problem=msg_func(value),
+ problem_mark=value.mark)
+ return proceed, hadproblem
+
+ def check_list(self, value, context_mark, data, context, echoerr, item_func, msg_func):
+ '''Check that each value in the list matches given specification
+
+ :param function item_func:
+ Callable like ``func`` from :py:meth:`Spec.check_func`. Unlike
+ ``func`` this callable is called for each value in the list and may
+ be a :py:class:`Spec` object index.
+ :param func msg_func:
+ Callable like ``msg_func`` from :py:meth:`Spec.check_func`. Should
+ accept one problematic item and is not used for :py:class:`Spec`
+ object indices in ``item_func`` method.
+
+ :return: proceed, hadproblem.
+ '''
+ havemarks(value)
+ i = 0
+ hadproblem = False
+ for item in value:
+ havemarks(item)
+ if isinstance(item_func, int):
+ spec = self.specs[item_func]
+ proceed, fhadproblem = spec.match(
+ item,
+ value.mark,
+ data,
+ context.enter_item('list item ' + unicode(i), item),
+ echoerr
+ )
+ else:
+ proceed, echo, fhadproblem = item_func(item, data, context, echoerr)
+ if echo and fhadproblem:
+ echoerr(context=self.cmsg.format(key=context.key + '/list item ' + unicode(i)),
+ context_mark=value.mark,
+ problem=msg_func(item),
+ problem_mark=item.mark)
+ if fhadproblem:
+ hadproblem = True
+ if not proceed:
+ return proceed, hadproblem
+ i += 1
+ return True, hadproblem
+
+ def check_either(self, value, context_mark, data, context, echoerr, start, end):
+ '''Check that given value matches one of the given specifications
+
+ :param int start:
+ First specification index.
+ :param int end:
+ Specification index that is greater by 1 then last specification
+ index.
+
+ This method does not give an error if any specification from
+ ``self.specs[start:end]`` is matched by the given value.
+ '''
+ havemarks(value)
+ new_echoerr = DelayedEchoErr(
+ echoerr,
+ 'One of the either variants failed. Messages from the first variant:',
+ 'messages from the next variant:'
+ )
+
+ hadproblem = False
+ for spec in self.specs[start:end]:
+ proceed, hadproblem = spec.match(value, value.mark, data, context, new_echoerr)
+ new_echoerr.next_variant()
+ if not proceed:
+ break
+ if not hadproblem:
+ return True, False
+
+ new_echoerr.echo_all()
+
+ return False, hadproblem
+
+ def check_tuple(self, value, context_mark, data, context, echoerr, start, end):
+ '''Check that given value is a list with items matching specifications
+
+ :param int start:
+ First specification index.
+ :param int end:
+ Specification index that is greater by 1 then last specification
+ index.
+
+ This method checks that each item in the value list matches
+ specification with index ``start + item_number``.
+ '''
+ havemarks(value)
+ hadproblem = False
+ for (i, item, spec) in zip(itertools.count(), value, self.specs[start:end]):
+ proceed, ihadproblem = spec.match(
+ item,
+ value.mark,
+ data,
+ context.enter_item('tuple item ' + unicode(i), item),
+ echoerr
+ )
+ if ihadproblem:
+ hadproblem = True
+ if not proceed:
+ return False, hadproblem
+ return True, hadproblem
+
+ def check_printable(self, value, context_mark, data, context, echoerr, _):
+ '''Check that given unicode string contains only printable characters
+ '''
+ hadproblem = False
+ for match in NON_PRINTABLE_RE.finditer(value):
+ hadproblem = True
+ echoerr(
+ context=self.cmsg.format(key=context.key),
+ context_mark=value.mark,
+ problem='found not printable character U+{0:04x} in a configuration string'.format(
+ ord(match.group(0))),
+ problem_mark=value.mark.advance_string(match.start() + 1)
+ )
+ return True, hadproblem
+
+ def printable(self, *args):
+ self.type(unicode)
+ self.checks.append(('check_printable', args))
+ return self
+
+ def type(self, *args):
+ '''Describe value that has one of the types given in arguments
+
+ :param args:
+ List of accepted types. Since :py:class:`Spec` is supposed to
+ describe JSON values only ``dict``, ``list``, ``unicode``, ``bool``,
+ ``float`` and ``NoneType`` types make any sense.
+
+ :return: self.
+ '''
+ self.checks.append(('check_type', args))
+ return self
+
+ cmp_funcs = {
+ 'le': lambda x, y: x <= y,
+ 'lt': lambda x, y: x < y,
+ 'ge': lambda x, y: x >= y,
+ 'gt': lambda x, y: x > y,
+ 'eq': lambda x, y: x == y,
+ }
+
+ cmp_msgs = {
+ 'le': 'lesser or equal to',
+ 'lt': 'lesser then',
+ 'ge': 'greater or equal to',
+ 'gt': 'greater then',
+ 'eq': 'equal to',
+ }
+
+ def len(self, comparison, cint, msg_func=None):
+ '''Describe value that has given length
+
+ :param str comparison:
+ Type of the comparison. Valid values: ``le``, ``lt``, ``ge``,
+ ``gt``, ``eq``.
+ :param int cint:
+ Integer with which length is compared.
+ :param function msg_func:
+ Function that should accept checked value and return message that
+ describes the problem with this value. Default value will emit
+ something like “length of ['foo', 'bar'] is not greater then 10”.
+
+ :return: self.
+ '''
+ cmp_func = self.cmp_funcs[comparison]
+ msg_func = (
+ msg_func
+ or (lambda value: 'length of {0!r} is not {1} {2}'.format(
+ value, self.cmp_msgs[comparison], cint))
+ )
+ self.checks.append((
+ 'check_func',
+ (lambda value, *args: (True, True, not cmp_func(len(value), cint))),
+ msg_func
+ ))
+ return self
+
+ def cmp(self, comparison, cint, msg_func=None):
+ '''Describe value that is a number or string that has given property
+
+ :param str comparison:
+ Type of the comparison. Valid values: ``le``, ``lt``, ``ge``,
+ ``gt``, ``eq``. This argument will restrict the number or string to
+ emit True on the given comparison.
+ :param cint:
+ Number or string with which value is compared. Type of this
+ parameter affects required type of the checked value: ``str`` and
+ ``unicode`` types imply ``unicode`` values, ``float`` type implies
+ that value can be either ``int`` or ``float``, ``int`` type implies
+ ``int`` value and for any other type the behavior is undefined.
+ :param function msg_func:
+ Function that should accept checked value and return message that
+ describes the problem with this value. Default value will emit
+ something like “10 is not greater then 10”.
+
+ :return: self.
+ '''
+ if type(cint) is str:
+ self.type(unicode)
+ elif type(cint) is float:
+ self.type(int, float)
+ else:
+ self.type(type(cint))
+ cmp_func = self.cmp_funcs[comparison]
+ msg_func = msg_func or (lambda value: '{0} is not {1} {2}'.format(value, self.cmp_msgs[comparison], cint))
+ self.checks.append((
+ 'check_func',
+ (lambda value, *args: (True, True, not cmp_func(value.value, cint))),
+ msg_func
+ ))
+ return self
+
+ def unsigned(self, msg_func=None):
+ '''Describe unsigned integer value
+
+ :param function msg_func:
+ Function that should accept checked value and return message that
+ describes the problem with this value.
+
+ :return: self.
+ '''
+ self.type(int)
+ self.checks.append((
+ 'check_func',
+ (lambda value, *args: (True, True, value < 0)),
+ (lambda value: '{0} must be greater then zero'.format(value))
+ ))
+ return self
+
+ def list(self, item_func, msg_func=None):
+ '''Describe list with any number of elements, each matching given spec
+
+ :param item_func:
+ :py:class:`Spec` instance or a callable. Check out
+ :py:meth:`Spec.check_list` documentation for more details. Note that
+ in :py:meth:`Spec.check_list` description :py:class:`Spec` instance
+ is replaced with its index in ``self.specs``.
+ :param function msg_func:
+ Function that should accept checked value and return message that
+ describes the problem with this value. Default value will emit just
+ “failed check”, which is rather indescriptive.
+
+ :return: self.
+ '''
+ self.type(list)
+ if isinstance(item_func, Spec):
+ self.specs.append(item_func)
+ item_func = len(self.specs) - 1
+ self.checks.append(('check_list', item_func, msg_func or (lambda item: 'failed check')))
+ return self
+
+ def tuple(self, *specs):
+ '''Describe list with the given number of elements, each matching corresponding spec
+
+ :param (Spec,) specs:
+ List of specifications. Last element(s) in this list may be
+ optional. Each element in this list describes element with the same
+ index in the checked value. Check out :py:meth:`Spec.check_tuple`
+ for more details, but note that there list of specifications is
+ replaced with start and end indices in ``self.specs``.
+
+ :return: self.
+ '''
+ self.type(list)
+
+ max_len = len(specs)
+ min_len = max_len
+ for spec in reversed(specs):
+ if spec.isoptional:
+ min_len -= 1
+ else:
+ break
+ if max_len == min_len:
+ self.len('eq', len(specs))
+ else:
+ if min_len > 0:
+ self.len('ge', min_len)
+ self.len('le', max_len)
+
+ start = len(self.specs)
+ for i, spec in zip(itertools.count(), specs):
+ self.specs.append(spec)
+ self.checks.append(('check_tuple', start, len(self.specs)))
+ return self
+
+ def func(self, func, msg_func=None):
+ '''Describe value that is checked by the given function
+
+ Check out :py:meth:`Spec.check_func` documentation for more details.
+ '''
+ self.checks.append(('check_func', func, msg_func or (lambda value: 'failed check')))
+ return self
+
+ def re(self, regex, msg_func=None):
+ '''Describe value that is a string that matches given regular expression
+
+ :param str regex:
+ Regular expression that should be matched by the value.
+ :param function msg_func:
+ Function that should accept checked value and return message that
+ describes the problem with this value. Default value will emit
+ something like “String "xyz" does not match "[a-f]+"”.
+
+ :return: self.
+ '''
+ self.type(unicode)
+ compiled = re.compile(regex)
+ msg_func = msg_func or (lambda value: 'String "{0}" does not match "{1}"'.format(value, regex))
+ self.checks.append((
+ 'check_func',
+ (lambda value, *args: (True, True, not compiled.match(value.value))),
+ msg_func
+ ))
+ return self
+
+ def ident(self, msg_func=None):
+ '''Describe value that is an identifier like ``foo:bar`` or ``foo``
+
+ :param function msg_func:
+ Function that should accept checked value and return message that
+ describes the problem with this value. Default value will emit
+ something like “String "xyz" is not an … identifier”.
+
+ :return: self.
+ '''
+ msg_func = (
+ msg_func
+ or (lambda value: 'String "{0}" is not an alphanumeric/underscore colon-separated identifier'.format(value))
+ )
+ return self.re('^\w+(?::\w+)?$', msg_func)
+
+ def oneof(self, collection, msg_func=None):
+ '''Describe value that is equal to one of the value in the collection
+
+ :param set collection:
+ A collection of possible values.
+ :param function msg_func:
+ Function that should accept checked value and return message that
+ describes the problem with this value. Default value will emit
+ something like “"xyz" must be one of {'abc', 'def', 'ghi'}”.
+
+ :return: self.
+ '''
+ msg_func = msg_func or (lambda value: '"{0}" must be one of {1!r}'.format(value, list(collection)))
+ self.checks.append((
+ 'check_func',
+ (lambda value, *args: (True, True, value not in collection)),
+ msg_func
+ ))
+ return self
+
+ def error(self, msg):
+ '''Describe value that must not be there
+
+ Useful for giving more descriptive errors for some specific keys then
+ just “found unknown key: shutdown_event” or for forbidding certain
+ values when :py:meth:`Spec.unknown_spec` was used.
+
+ :param str msg:
+ Message given for the offending value. It is formatted using
+ :py:meth:`str.format` with the only positional parameter which is
+ the value itself.
+
+ :return: self.
+ '''
+ self.checks.append((
+ 'check_func',
+ (lambda *args: (True, True, True)),
+ (lambda value: msg.format(value))
+ ))
+ return self
+
+ def either(self, *specs):
+ '''Describes value that matches one of the given specs
+
+ Check out :py:meth:`Spec.check_either` method documentation for more
+ details, but note that there a list of specs was replaced by start and
+ end indices in ``self.specs``.
+
+ :return: self.
+ '''
+ start = len(self.specs)
+ self.specs.extend(specs)
+ self.checks.append(('check_either', start, len(self.specs)))
+ return self
+
+ def optional(self):
+ '''Mark value as optional
+
+ Only useful for key specs in :py:meth:`Spec.__init__` and
+ :py:meth:`Spec.update` and some last supplied to :py:meth:`Spec.tuple`.
+
+ :return: self.
+ '''
+ self.isoptional = True
+ return self
+
+ def required(self):
+ '''Mark value as required
+
+ Only useful for key specs in :py:meth:`Spec.__init__` and
+ :py:meth:`Spec.update` and some last supplied to :py:meth:`Spec.tuple`.
+
+ .. note::
+ Value is required by default. This method is only useful for
+ altering existing specification (or rather its copy).
+
+ :return: self.
+ '''
+ self.isoptional = False
+ return self
+
+ def match_checks(self, *args):
+ '''Process checks registered for the given value
+
+ Processes only “top-level” checks: key specifications given using at the
+ initialization or via :py:meth:`Spec.unknown_spec` are processed by
+ :py:meth:`Spec.match`.
+
+ :return: proceed, hadproblem.
+ '''
+ hadproblem = False
+ for check in self.checks:
+ proceed, chadproblem = getattr(self, check[0])(*(args + check[1:]))
+ if chadproblem:
+ hadproblem = True
+ if not proceed:
+ return False, hadproblem
+ return True, hadproblem
+
+ def match(self, value, context_mark=None, data=None, context=(), echoerr=echoerr):
+ '''Check that given value matches this specification
+
+ :return: proceed, hadproblem.
+ '''
+ havemarks(value)
+ proceed, hadproblem = self.match_checks(value, context_mark, data, context, echoerr)
+ if proceed:
+ if self.keys or self.uspecs:
+ for key, vali in self.keys.items():
+ valspec = self.specs[vali]
+ if key in value:
+ proceed, mhadproblem = valspec.match(
+ value[key],
+ value.mark,
+ data,
+ context.enter_key(value, key),
+ echoerr
+ )
+ if mhadproblem:
+ hadproblem = True
+ if not proceed:
+ return False, hadproblem
+ else:
+ if not valspec.isoptional:
+ hadproblem = True
+ echoerr(context=self.cmsg.format(key=context.key),
+ context_mark=None,
+ problem='required key is missing: {0}'.format(key),
+ problem_mark=value.mark)
+ for key in value.keys():
+ havemarks(key)
+ if key not in self.keys:
+ for keyfunc, vali in self.uspecs:
+ valspec = self.specs[vali]
+ if isinstance(keyfunc, int):
+ spec = self.specs[keyfunc]
+ proceed, khadproblem = spec.match(key, context_mark, data, context, echoerr)
+ else:
+ proceed, khadproblem = keyfunc(key, data, context, echoerr)
+ if khadproblem:
+ hadproblem = True
+ if proceed:
+ proceed, vhadproblem = valspec.match(
+ value[key],
+ value.mark,
+ data,
+ context.enter_key(value, key),
+ echoerr
+ )
+ if vhadproblem:
+ hadproblem = True
+ break
+ else:
+ hadproblem = True
+ if self.ufailmsg:
+ echoerr(context=self.cmsg.format(key=context.key),
+ context_mark=None,
+ problem=self.ufailmsg(key),
+ problem_mark=key.mark)
+ return True, hadproblem
+
+ def __getitem__(self, key):
+ '''Get specification for the given key
+ '''
+ return self.specs[self.keys[key]]
+
+ def __setitem__(self, key, value):
+ '''Set specification for the given key
+ '''
+ self.update(**{key: value})