summaryrefslogtreecommitdiffstats
path: root/powerline/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'powerline/__init__.py')
-rw-r--r--powerline/__init__.py991
1 files changed, 991 insertions, 0 deletions
diff --git a/powerline/__init__.py b/powerline/__init__.py
new file mode 100644
index 0000000..a50b939
--- /dev/null
+++ b/powerline/__init__.py
@@ -0,0 +1,991 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import os
+import sys
+import logging
+
+from threading import Lock, Event
+
+from powerline.colorscheme import Colorscheme
+from powerline.lib.config import ConfigLoader
+from powerline.lib.unicode import unicode, safe_unicode, FailedUnicode
+from powerline.config import DEFAULT_SYSTEM_CONFIG_DIR
+from powerline.lib.dict import mergedicts
+from powerline.lib.encoding import get_preferred_output_encoding
+from powerline.lib.path import join
+from powerline.version import __version__
+
+class NotInterceptedError(BaseException):
+ pass
+
+
+def _config_loader_condition(path):
+ if path and os.path.isfile(path):
+ return path
+ return None
+
+
+def _find_config_files(search_paths, config_file, config_loader=None, loader_callback=None):
+ config_file += '.json'
+ found = False
+ for path in search_paths:
+ config_file_path = join(path, config_file)
+ if os.path.isfile(config_file_path):
+ yield config_file_path
+ found = True
+ elif config_loader:
+ config_loader.register_missing(_config_loader_condition, loader_callback, config_file_path)
+ if not found:
+ raise IOError('Config file not found in search paths ({0}): {1}'.format(
+ ', '.join(search_paths),
+ config_file
+ ))
+
+
+class PowerlineLogger(object):
+ '''Proxy class for logging.Logger instance
+
+ It emits messages in format ``{ext}:{prefix}:{message}`` where
+
+ ``{ext}``
+ is a used powerline extension (e.g. “vim”, “shell”, “ipython”).
+ ``{prefix}``
+ is a local prefix, usually a segment name.
+ ``{message}``
+ is the original message passed to one of the logging methods.
+
+ Each of the methods (``critical``, ``exception``, ``info``, ``error``,
+ ``warn``, ``debug``) expects to receive message in an ``str.format`` format,
+ not in printf-like format.
+
+ Log is saved to the location :ref:`specified by user <config-common-log>`.
+ '''
+
+ def __init__(self, use_daemon_threads, logger, ext):
+ self.logger = logger
+ self.ext = ext
+ self.use_daemon_threads = use_daemon_threads
+ self.prefix = ''
+ self.last_msgs = {}
+
+ def _log(self, attr, msg, *args, **kwargs):
+ prefix = kwargs.get('prefix') or self.prefix
+ prefix = self.ext + ((':' + prefix) if prefix else '')
+ msg = safe_unicode(msg)
+ if args or kwargs:
+ args = [safe_unicode(s) if isinstance(s, bytes) else s for s in args]
+ kwargs = dict((
+ (k, safe_unicode(v) if isinstance(v, bytes) else v)
+ for k, v in kwargs.items()
+ ))
+ msg = msg.format(*args, **kwargs)
+ msg = prefix + ':' + msg
+ key = attr + ':' + prefix
+ if msg != self.last_msgs.get(key):
+ getattr(self.logger, attr)(msg)
+ self.last_msgs[key] = msg
+
+ def critical(self, msg, *args, **kwargs):
+ self._log('critical', msg, *args, **kwargs)
+
+ def exception(self, msg, *args, **kwargs):
+ self._log('exception', msg, *args, **kwargs)
+
+ def info(self, msg, *args, **kwargs):
+ self._log('info', msg, *args, **kwargs)
+
+ def error(self, msg, *args, **kwargs):
+ self._log('error', msg, *args, **kwargs)
+
+ def warn(self, msg, *args, **kwargs):
+ self._log('warning', msg, *args, **kwargs)
+
+ def debug(self, msg, *args, **kwargs):
+ self._log('debug', msg, *args, **kwargs)
+
+
+_fallback_logger = None
+
+
+def get_fallback_logger(stream=None):
+ global _fallback_logger
+ if _fallback_logger:
+ return _fallback_logger
+
+ log_format = '%(asctime)s:%(levelname)s:%(message)s'
+ formatter = logging.Formatter(log_format)
+
+ level = logging.WARNING
+ handler = logging.StreamHandler(stream)
+ handler.setLevel(level)
+ handler.setFormatter(formatter)
+
+ logger = logging.Logger('powerline')
+ logger.setLevel(level)
+ logger.addHandler(handler)
+ _fallback_logger = PowerlineLogger(None, logger, '_fallback_')
+ return _fallback_logger
+
+
+def _generate_change_callback(lock, key, dictionary):
+ def on_file_change(path):
+ with lock:
+ dictionary[key] = True
+ return on_file_change
+
+
+def get_config_paths():
+ '''Get configuration paths from environment variables.
+
+ Uses $XDG_CONFIG_HOME and $XDG_CONFIG_DIRS according to the XDG specification.
+
+ :return: list of paths
+ '''
+ config_home = os.environ.get('XDG_CONFIG_HOME', os.path.join(os.path.expanduser('~'), '.config'))
+ config_path = join(config_home, 'powerline')
+ config_paths = [config_path]
+ config_dirs = os.environ.get('XDG_CONFIG_DIRS', DEFAULT_SYSTEM_CONFIG_DIR)
+ if config_dirs is not None:
+ config_paths[:0] = reversed([join(d, 'powerline') for d in config_dirs.split(':')])
+ plugin_path = join(os.path.realpath(os.path.dirname(__file__)), 'config_files')
+ config_paths.insert(0, plugin_path)
+ return config_paths
+
+
+def generate_config_finder(get_config_paths=get_config_paths):
+ '''Generate find_config_files function
+
+ This function will find .json file given its path.
+
+ :param function get_config_paths:
+ Function that being called with no arguments will return a list of paths
+ that should be searched for configuration files.
+
+ :return:
+ Function that being given configuration file name will return full path
+ to it or raise IOError if it failed to find the file.
+ '''
+ config_paths = get_config_paths()
+ return lambda *args: _find_config_files(config_paths, *args)
+
+
+def load_config(cfg_path, find_config_files, config_loader, loader_callback=None):
+ '''Load configuration file and setup watches
+
+ Watches are only set up if loader_callback is not None.
+
+ :param str cfg_path:
+ Path for configuration file that should be loaded.
+ :param function find_config_files:
+ Function that finds configuration file. Check out the description of
+ the return value of ``generate_config_finder`` function.
+ :param ConfigLoader config_loader:
+ Configuration file loader class instance.
+ :param function loader_callback:
+ Function that will be called by config_loader when change to
+ configuration file is detected.
+
+ :return: Configuration file contents.
+ '''
+ found_files = find_config_files(cfg_path, config_loader, loader_callback)
+ ret = None
+ for path in found_files:
+ if loader_callback:
+ config_loader.register(loader_callback, path)
+ if ret is None:
+ ret = config_loader.load(path)
+ else:
+ mergedicts(ret, config_loader.load(path))
+ return ret
+
+
+def _set_log_handlers(common_config, logger, get_module_attr, stream=None):
+ '''Set log handlers
+
+ :param dict common_config:
+ Configuration dictionary used to create handler.
+ :param logging.Logger logger:
+ Logger to which handlers will be attached.
+ :param func get_module_attr:
+ :py:func:`gen_module_attr_getter` output.
+ :param file stream:
+ Stream to use by default for :py:class:`logging.StreamHandler` in place
+ of :py:attr:`sys.stderr`. May be ``None``.
+ '''
+ log_targets = common_config['log_file']
+ num_handlers = 0
+ for log_target in log_targets:
+ if log_target is None:
+ log_target = ['logging.StreamHandler', []]
+ elif isinstance(log_target, unicode):
+ log_target = os.path.expanduser(log_target)
+ log_dir = os.path.dirname(log_target)
+ if log_dir and not os.path.isdir(log_dir):
+ os.mkdir(log_dir)
+ log_target = ['logging.FileHandler', [[log_target]]]
+ module, handler_class_name = log_target[0].rpartition('.')[::2]
+ module = module or 'logging.handlers'
+ try:
+ handler_class_args = log_target[1][0]
+ except IndexError:
+ if module == 'logging' and handler_class_name == 'StreamHandler':
+ handler_class_args = [stream]
+ else:
+ handler_class_args = ()
+ try:
+ handler_class_kwargs = log_target[1][1]
+ except IndexError:
+ handler_class_kwargs = {}
+ module = str(module)
+ handler_class_name = str(handler_class_name)
+ handler_class = get_module_attr(module, handler_class_name)
+ if not handler_class:
+ continue
+ handler = handler_class(*handler_class_args, **handler_class_kwargs)
+ try:
+ handler_level_name = log_target[2]
+ except IndexError:
+ handler_level_name = common_config['log_level']
+ try:
+ handler_format = log_target[3]
+ except IndexError:
+ handler_format = common_config['log_format']
+ handler.setLevel(getattr(logging, handler_level_name))
+ handler.setFormatter(logging.Formatter(handler_format))
+ logger.addHandler(handler)
+ num_handlers += 1
+ if num_handlers == 0 and log_targets:
+ raise ValueError('Failed to set up any handlers')
+
+
+def create_logger(common_config, use_daemon_threads=True, ext='__unknown__',
+ import_paths=None, imported_modules=None, stream=None):
+ '''Create logger according to provided configuration
+
+ :param dict common_config:
+ Common configuration, from :py:func:`finish_common_config`.
+ :param bool use_daemon_threads:
+ Whether daemon threads should be used. Argument to
+ :py:class:`PowerlineLogger` constructor.
+ :param str ext:
+ Used extension. Argument to :py:class:`PowerlineLogger` constructor.
+ :param set imported_modules:
+ Set where imported modules are saved. Argument to
+ :py:func:`gen_module_attr_getter`. May be ``None``, in this case new
+ empty set is used.
+ :param file stream:
+ Stream to use by default for :py:class:`logging.StreamHandler` in place
+ of :py:attr:`sys.stderr`. May be ``None``.
+
+ :return: Three objects:
+
+ #. :py:class:`logging.Logger` instance.
+ #. :py:class:`PowerlineLogger` instance.
+ #. Function, output of :py:func:`gen_module_attr_getter`.
+ '''
+ logger = logging.Logger('powerline')
+ level = getattr(logging, common_config['log_level'])
+ logger.setLevel(level)
+
+ pl = PowerlineLogger(use_daemon_threads, logger, ext)
+ get_module_attr = gen_module_attr_getter(
+ pl, common_config['paths'],
+ set() if imported_modules is None else imported_modules)
+
+ _set_log_handlers(common_config, logger, get_module_attr, stream)
+
+ return logger, pl, get_module_attr
+
+
+def get_default_theme(is_unicode=True):
+ '''Get default theme used by powerline
+
+ :param bool is_unicode:
+ If true, return theme for unicode environments, otherwise return theme
+ that is supposed to be ASCII-only.
+
+ :return: theme name.
+ '''
+ return 'powerline_terminus' if is_unicode else 'ascii'
+
+
+def finish_common_config(encoding, common_config):
+ '''Add default values to common config and expand ~ in paths
+
+ :param dict common_config:
+ Common configuration, as it was just loaded.
+
+ :return:
+ Copy of common configuration with all configuration keys and expanded
+ paths.
+ '''
+ encoding = encoding.lower()
+ default_top_theme = get_default_theme(
+ encoding.startswith('utf') or encoding.startswith('ucs'))
+
+ common_config = common_config.copy()
+ common_config.setdefault('default_top_theme', default_top_theme)
+ common_config.setdefault('paths', [])
+ common_config.setdefault('watcher', 'auto')
+ common_config.setdefault('log_level', 'WARNING')
+ common_config.setdefault('log_format', '%(asctime)s:%(levelname)s:%(message)s')
+ common_config.setdefault('term_truecolor', False)
+ common_config.setdefault('term_escape_style', 'auto')
+ common_config.setdefault('ambiwidth', 1)
+ common_config.setdefault('additional_escapes', None)
+ common_config.setdefault('reload_config', True)
+ common_config.setdefault('interval', None)
+ common_config.setdefault('log_file', [None])
+
+ if not isinstance(common_config['log_file'], list):
+ common_config['log_file'] = [common_config['log_file']]
+
+ common_config['paths'] = [
+ os.path.expanduser(path) for path in common_config['paths']
+ ]
+
+ return common_config
+
+
+if sys.version_info < (3,):
+ # `raise exception[0], None, exception[1]` is a SyntaxError in python-3*
+ # Not using ('''…''') because this syntax does not work in python-2.6
+ exec((
+ 'def reraise(exception):\n'
+ ' if type(exception) is tuple:\n'
+ ' raise exception[0], None, exception[1]\n'
+ ' else:\n'
+ ' raise exception\n'
+ ))
+else:
+ def reraise(exception):
+ if type(exception) is tuple:
+ raise exception[0].with_traceback(exception[1])
+ else:
+ raise exception
+
+
+def gen_module_attr_getter(pl, import_paths, imported_modules):
+ def get_module_attr(module, attr, prefix='powerline'):
+ '''Import module and get its attribute.
+
+ Replaces ``from {module} import {attr}``.
+
+ :param str module:
+ Module name, will be passed as first argument to ``__import__``.
+ :param str attr:
+ Module attribute, will be passed to ``__import__`` as the only value
+ in ``fromlist`` tuple.
+
+ :return:
+ Attribute value or ``None``. Note: there is no way to distinguish
+ between successful import of attribute equal to ``None`` and
+ unsuccessful import.
+ '''
+ oldpath = sys.path
+ sys.path = import_paths + sys.path
+ module = str(module)
+ attr = str(attr)
+ try:
+ imported_modules.add(module)
+ return getattr(__import__(module, fromlist=(attr,)), attr)
+ except Exception as e:
+ pl.exception('Failed to import attr {0} from module {1}: {2}', attr, module, str(e), prefix=prefix)
+ return None
+ finally:
+ sys.path = oldpath
+
+ return get_module_attr
+
+
+LOG_KEYS = set(('log_format', 'log_level', 'log_file', 'paths'))
+'''List of keys related to logging
+'''
+
+
+def _get_log_keys(common_config):
+ '''Return a common configuration copy with only log-related config left
+
+ :param dict common_config:
+ Common configuration.
+
+ :return:
+ :py:class:`dict` instance which has only keys from
+ :py:attr:`powerline.LOG_KEYS` left.
+ '''
+ return dict((
+ (k, v) for k, v in common_config.items() if k in LOG_KEYS
+ ))
+
+
+DEFAULT_UPDATE_INTERVAL = 2
+'''Default value for :ref:`update_interval <config-ext-update_interval>`
+'''
+
+
+class Powerline(object):
+ '''Main powerline class, entrance point for all powerline uses. Sets
+ powerline up and loads the configuration.
+
+ :param str ext:
+ extension used. Determines where configuration files will
+ searched and what renderer module will be used. Affected: used ``ext``
+ dictionary from :file:`powerline/config.json`, location of themes and
+ colorschemes, render module (``powerline.renders.{ext}``).
+ :param str renderer_module:
+ Overrides renderer module (defaults to ``ext``). Should be the name of
+ the package imported like this: ``powerline.renderers.{render_module}``.
+ If this parameter contains a dot ``powerline.renderers.`` is not
+ prepended. There is also a special case for renderers defined in
+ toplevel modules: ``foo.`` (note: dot at the end) tries to get renderer
+ from module ``foo`` (because ``foo`` (without dot) tries to get renderer
+ from module ``powerline.renderers.foo``). When ``.foo`` (with leading
+ dot) variant is used ``renderer_module`` will be
+ ``powerline.renderers.{ext}{renderer_module}``.
+ :param bool run_once:
+ Determines whether :py:meth:`render` method will be run only once
+ during python session.
+ :param Logger logger:
+ If present no new logger will be created and the provided logger will be
+ used.
+ :param bool use_daemon_threads:
+ When creating threads make them daemon ones.
+ :param Event shutdown_event:
+ Use this Event as shutdown_event instead of creating new event.
+ :param ConfigLoader config_loader:
+ Instance of the class that manages (re)loading of the configuration.
+ '''
+
+ def __init__(self, *args, **kwargs):
+ self.init_args = (args, kwargs)
+ self.init(*args, **kwargs)
+
+ def init(self,
+ ext,
+ renderer_module=None,
+ run_once=False,
+ logger=None,
+ use_daemon_threads=True,
+ shutdown_event=None,
+ config_loader=None):
+ '''Do actual initialization.
+
+ __init__ function only stores the arguments and runs this function. This
+ function exists for powerline to be able to reload itself: it is easier
+ to make ``__init__`` store arguments and call overridable ``init`` than
+ tell developers that each time they override Powerline.__init__ in
+ subclasses they must store actual arguments.
+ '''
+ self.ext = ext
+ self.run_once = run_once
+ self.logger = logger
+ self.had_logger = bool(self.logger)
+ self.use_daemon_threads = use_daemon_threads
+
+ if not renderer_module:
+ self.renderer_module = 'powerline.renderers.' + ext
+ elif '.' not in renderer_module:
+ self.renderer_module = 'powerline.renderers.' + renderer_module
+ elif renderer_module.startswith('.'):
+ self.renderer_module = 'powerline.renderers.' + ext + renderer_module
+ elif renderer_module.endswith('.'):
+ self.renderer_module = renderer_module[:-1]
+ else:
+ self.renderer_module = renderer_module
+
+ self.find_config_files = generate_config_finder(self.get_config_paths)
+
+ self.cr_kwargs_lock = Lock()
+ self.cr_kwargs = {}
+ self.cr_callbacks = {}
+ for key in ('main', 'colors', 'colorscheme', 'theme'):
+ self.cr_kwargs['load_' + key] = True
+ self.cr_callbacks[key] = _generate_change_callback(
+ self.cr_kwargs_lock,
+ 'load_' + key,
+ self.cr_kwargs
+ )
+
+ self.shutdown_event = shutdown_event or Event()
+ self.config_loader = config_loader or ConfigLoader(shutdown_event=self.shutdown_event, run_once=run_once)
+ self.run_loader_update = False
+
+ self.renderer_options = {}
+
+ self.prev_common_config = None
+ self.prev_ext_config = None
+ self.pl = None
+ self.setup_args = ()
+ self.setup_kwargs = {}
+ self.imported_modules = set()
+ self.update_interval = DEFAULT_UPDATE_INTERVAL
+
+ get_encoding = staticmethod(get_preferred_output_encoding)
+ '''Get encoding used by the current application
+
+ Usually returns encoding of the current locale.
+ '''
+
+ def create_logger(self):
+ '''Create logger
+
+ This function is used to create logger unless it was already specified
+ at initialization.
+
+ :return: Three objects:
+
+ #. :py:class:`logging.Logger` instance.
+ #. :py:class:`PowerlineLogger` instance.
+ #. Function, output of :py:func:`gen_module_attr_getter`.
+ '''
+ return create_logger(
+ common_config=self.common_config,
+ use_daemon_threads=self.use_daemon_threads,
+ ext=self.ext,
+ imported_modules=self.imported_modules,
+ stream=self.default_log_stream,
+ )
+
+ def create_renderer(self, load_main=False, load_colors=False, load_colorscheme=False, load_theme=False):
+ '''(Re)create renderer object. Can be used after Powerline object was
+ successfully initialized. If any of the below parameters except
+ ``load_main`` is True renderer object will be recreated.
+
+ :param bool load_main:
+ Determines whether main configuration file (:file:`config.json`)
+ should be loaded. If appropriate configuration changes implies
+ ``load_colorscheme`` and ``load_theme`` and recreation of renderer
+ object. Won’t trigger recreation if only unrelated configuration
+ changed.
+ :param bool load_colors:
+ Determines whether colors configuration from :file:`colors.json`
+ should be (re)loaded.
+ :param bool load_colorscheme:
+ Determines whether colorscheme configuration should be (re)loaded.
+ :param bool load_theme:
+ Determines whether theme configuration should be reloaded.
+ '''
+ common_config_differs = False
+ ext_config_differs = False
+ if load_main:
+ self._purge_configs('main')
+ config = self.load_main_config()
+ self.common_config = finish_common_config(self.get_encoding(), config['common'])
+ if self.common_config != self.prev_common_config:
+ common_config_differs = True
+
+ load_theme = (load_theme
+ or not self.prev_common_config
+ or self.prev_common_config['default_top_theme'] != self.common_config['default_top_theme'])
+
+ log_keys_differ = (not self.prev_common_config or (
+ _get_log_keys(self.prev_common_config) != _get_log_keys(self.common_config)
+ ))
+
+ self.prev_common_config = self.common_config
+
+ if log_keys_differ:
+ if self.had_logger:
+ self.pl = PowerlineLogger(self.use_daemon_threads, self.logger, self.ext)
+ self.get_module_attr = gen_module_attr_getter(
+ self.pl, self.common_config['paths'], self.imported_modules)
+ else:
+ self.logger, self.pl, self.get_module_attr = self.create_logger()
+ self.config_loader.pl = self.pl
+
+ if not self.run_once:
+ self.config_loader.set_watcher(self.common_config['watcher'])
+
+ mergedicts(self.renderer_options, dict(
+ pl=self.pl,
+ term_truecolor=self.common_config['term_truecolor'],
+ term_escape_style=self.common_config['term_escape_style'],
+ ambiwidth=self.common_config['ambiwidth'],
+ tmux_escape=self.common_config['additional_escapes'] == 'tmux',
+ screen_escape=self.common_config['additional_escapes'] == 'screen',
+ theme_kwargs={
+ 'ext': self.ext,
+ 'common_config': self.common_config,
+ 'run_once': self.run_once,
+ 'shutdown_event': self.shutdown_event,
+ 'get_module_attr': self.get_module_attr,
+ },
+ ))
+
+ if not self.run_once and self.common_config['reload_config']:
+ interval = self.common_config['interval']
+ self.config_loader.set_interval(interval)
+ self.run_loader_update = (interval is None)
+ if interval is not None and not self.config_loader.is_alive():
+ self.config_loader.start()
+
+ self.ext_config = config['ext'][self.ext]
+
+ top_theme = (
+ self.ext_config.get('top_theme')
+ or self.common_config['default_top_theme']
+ )
+ self.theme_levels = (
+ os.path.join('themes', top_theme),
+ os.path.join('themes', self.ext, '__main__'),
+ )
+ self.renderer_options['theme_kwargs']['top_theme'] = top_theme
+
+ if self.ext_config != self.prev_ext_config:
+ ext_config_differs = True
+ if (
+ not self.prev_ext_config
+ or self.ext_config.get('components') != self.prev_ext_config.get('components')
+ ):
+ self.setup_components(self.ext_config.get('components'))
+ if (
+ not self.prev_ext_config
+ or self.ext_config.get('local_themes') != self.prev_ext_config.get('local_themes')
+ ):
+ self.renderer_options['local_themes'] = self.get_local_themes(self.ext_config.get('local_themes'))
+ self.update_interval = self.ext_config.get('update_interval', 2)
+ load_colorscheme = (
+ load_colorscheme
+ or not self.prev_ext_config
+ or self.prev_ext_config['colorscheme'] != self.ext_config['colorscheme']
+ )
+ load_theme = (
+ load_theme
+ or not self.prev_ext_config
+ or self.prev_ext_config['theme'] != self.ext_config['theme']
+ )
+ self.prev_ext_config = self.ext_config
+
+ create_renderer = load_colors or load_colorscheme or load_theme or common_config_differs or ext_config_differs
+
+ if load_colors:
+ self._purge_configs('colors')
+ self.colors_config = self.load_colors_config()
+
+ if load_colorscheme or load_colors:
+ self._purge_configs('colorscheme')
+ if load_colorscheme:
+ self.colorscheme_config = self.load_colorscheme_config(self.ext_config['colorscheme'])
+ self.renderer_options['theme_kwargs']['colorscheme'] = (
+ Colorscheme(self.colorscheme_config, self.colors_config))
+
+ if load_theme:
+ self._purge_configs('theme')
+ self.renderer_options['theme_config'] = self.load_theme_config(self.ext_config.get('theme', 'default'))
+
+ if create_renderer:
+ Renderer = self.get_module_attr(self.renderer_module, 'renderer')
+ if not Renderer:
+ if hasattr(self, 'renderer'):
+ return
+ else:
+ raise ImportError('Failed to obtain renderer')
+
+ # Renderer updates configuration file via segments’ .startup thus it
+ # should be locked to prevent state when configuration was updated,
+ # but .render still uses old renderer.
+ try:
+ renderer = Renderer(**self.renderer_options)
+ except Exception as e:
+ self.exception('Failed to construct renderer object: {0}', str(e))
+ if not hasattr(self, 'renderer'):
+ raise
+ else:
+ self.renderer = renderer
+
+ default_log_stream = sys.stdout
+ '''Default stream for default log handler
+
+ Usually it is ``sys.stderr``, but there is sometimes a reason to prefer
+ ``sys.stdout`` or a custom file-like object. It is not supposed to be used
+ to write to some file.
+ '''
+
+ def setup_components(self, components):
+ '''Run component-specific setup
+
+ :param set components:
+ Set of the enabled components or None.
+
+ Should be overridden by subclasses.
+ '''
+ pass
+
+ @staticmethod
+ def get_config_paths():
+ '''Get configuration paths.
+
+ Should be overridden in subclasses in order to provide a way to override
+ used paths.
+
+ :return: list of paths
+ '''
+ return get_config_paths()
+
+ def load_config(self, cfg_path, cfg_type):
+ '''Load configuration and setup watches
+
+ :param str cfg_path:
+ Path to the configuration file without any powerline configuration
+ directory or ``.json`` suffix.
+ :param str cfg_type:
+ Configuration type. May be one of ``main`` (for ``config.json``
+ file), ``colors``, ``colorscheme``, ``theme``.
+
+ :return: dictionary with loaded configuration.
+ '''
+ return load_config(
+ cfg_path,
+ self.find_config_files,
+ self.config_loader,
+ self.cr_callbacks[cfg_type]
+ )
+
+ def _purge_configs(self, cfg_type):
+ function = self.cr_callbacks[cfg_type]
+ self.config_loader.unregister_functions(set((function,)))
+ self.config_loader.unregister_missing(set(((self.find_config_files, function),)))
+
+ def load_main_config(self):
+ '''Get top-level configuration.
+
+ :return: dictionary with :ref:`top-level configuration <config-main>`.
+ '''
+ return self.load_config('config', 'main')
+
+ def _load_hierarhical_config(self, cfg_type, levels, ignore_levels):
+ '''Load and merge multiple configuration files
+
+ :param str cfg_type:
+ Type of the loaded configuration files (e.g. ``colorscheme``,
+ ``theme``).
+ :param list levels:
+ Configuration names resembling levels in hierarchy, sorted by
+ priority. Configuration file names with higher priority should go
+ last.
+ :param set ignore_levels:
+ If only files listed in this variable are present then configuration
+ file is considered not loaded: at least one file on the level not
+ listed in this variable must be present.
+ '''
+ config = {}
+ loaded = 0
+ exceptions = []
+ for i, cfg_path in enumerate(levels):
+ try:
+ lvl_config = self.load_config(cfg_path, cfg_type)
+ except IOError as e:
+ if sys.version_info < (3,):
+ tb = sys.exc_info()[2]
+ exceptions.append((e, tb))
+ else:
+ exceptions.append(e)
+ else:
+ if i not in ignore_levels:
+ loaded += 1
+ mergedicts(config, lvl_config)
+ if not loaded:
+ for exception in exceptions:
+ if type(exception) is tuple:
+ e = exception[0]
+ else:
+ e = exception
+ self.exception('Failed to load %s: {0}' % cfg_type, e, exception=exception)
+ raise e
+ return config
+
+ def load_colorscheme_config(self, name):
+ '''Get colorscheme.
+
+ :param str name:
+ Name of the colorscheme to load.
+
+ :return: dictionary with :ref:`colorscheme configuration <config-colorschemes>`.
+ '''
+ levels = (
+ os.path.join('colorschemes', name),
+ os.path.join('colorschemes', self.ext, '__main__'),
+ os.path.join('colorschemes', self.ext, name),
+ )
+ return self._load_hierarhical_config('colorscheme', levels, (1,))
+
+ def load_theme_config(self, name):
+ '''Get theme configuration.
+
+ :param str name:
+ Name of the theme to load.
+
+ :return: dictionary with :ref:`theme configuration <config-themes>`
+ '''
+ levels = self.theme_levels + (
+ os.path.join('themes', self.ext, name),
+ )
+ return self._load_hierarhical_config('theme', levels, (0, 1,))
+
+ def load_colors_config(self):
+ '''Get colorscheme.
+
+ :return: dictionary with :ref:`colors configuration <config-colors>`.
+ '''
+ return self.load_config('colors', 'colors')
+
+ @staticmethod
+ def get_local_themes(local_themes):
+ '''Get local themes. No-op here, to be overridden in subclasses if
+ required.
+
+ :param dict local_themes:
+ Usually accepts ``{matcher_name : theme_name}``. May also receive
+ None in case there is no local_themes configuration.
+
+ :return:
+ anything accepted by ``self.renderer.get_theme`` and processable by
+ ``self.renderer.add_local_theme``. Renderer module is determined by
+ ``__init__`` arguments, refer to its documentation.
+ '''
+ return None
+
+ def update_renderer(self):
+ '''Updates/creates a renderer if needed.'''
+ if self.run_loader_update:
+ self.config_loader.update()
+ cr_kwargs = None
+ with self.cr_kwargs_lock:
+ if self.cr_kwargs:
+ cr_kwargs = self.cr_kwargs.copy()
+ if cr_kwargs:
+ try:
+ self.create_renderer(**cr_kwargs)
+ except Exception as e:
+ self.exception('Failed to create renderer: {0}', str(e))
+ if hasattr(self, 'renderer'):
+ with self.cr_kwargs_lock:
+ self.cr_kwargs.clear()
+ else:
+ raise
+ else:
+ with self.cr_kwargs_lock:
+ self.cr_kwargs.clear()
+
+ def render(self, *args, **kwargs):
+ '''Update/create renderer if needed and pass all arguments further to
+ ``self.renderer.render()``.
+ '''
+ try:
+ self.update_renderer()
+ return self.renderer.render(*args, **kwargs)
+ except Exception as e:
+ exc = e
+ try:
+ self.exception('Failed to render: {0}', str(e))
+ except Exception as e:
+ exc = e
+ ret = FailedUnicode(safe_unicode(exc))
+ if kwargs.get('output_width', False):
+ ret = ret, len(ret)
+ return ret
+
+ def render_above_lines(self, *args, **kwargs):
+ '''Like .render(), but for ``self.renderer.render_above_lines()``
+ '''
+ try:
+ self.update_renderer()
+ for line in self.renderer.render_above_lines(*args, **kwargs):
+ yield line
+ except Exception as e:
+ exc = e
+ try:
+ self.exception('Failed to render: {0}', str(e))
+ except Exception as e:
+ exc = e
+ yield FailedUnicode(safe_unicode(exc))
+
+ def setup(self, *args, **kwargs):
+ '''Setup the environment to use powerline.
+
+ Must not be overridden by subclasses. This one only saves setup
+ arguments for :py:meth:`reload` method and calls :py:meth:`do_setup`.
+ '''
+ self.shutdown_event.clear()
+ self.setup_args = args
+ self.setup_kwargs.update(kwargs)
+ self.do_setup(*args, **kwargs)
+
+ @staticmethod
+ def do_setup():
+ '''Function that does initialization
+
+ Should be overridden by subclasses. May accept any number of regular or
+ keyword arguments.
+ '''
+ pass
+
+ def reload(self):
+ '''Reload powerline after update.
+
+ Should handle most (but not all) powerline updates.
+
+ Purges out all powerline modules and modules imported by powerline for
+ segment and matcher functions. Requires defining ``setup`` function that
+ updates reference to main powerline object.
+
+ .. warning::
+ Not guaranteed to work properly, use it at your own risk. It
+ may break your python code.
+ '''
+ import sys
+ modules = self.imported_modules | set((module for module in sys.modules if module.startswith('powerline')))
+ modules_holder = []
+ for module in modules:
+ try:
+ # Needs to hold module to prevent garbage collecting until they
+ # are all reloaded.
+ modules_holder.append(sys.modules.pop(module))
+ except KeyError:
+ pass
+ PowerlineClass = getattr(__import__(self.__module__, fromlist=(self.__class__.__name__,)), self.__class__.__name__)
+ self.shutdown(set_event=True)
+ init_args, init_kwargs = self.init_args
+ powerline = PowerlineClass(*init_args, **init_kwargs)
+ powerline.setup(*self.setup_args, **self.setup_kwargs)
+
+ def shutdown(self, set_event=True):
+ '''Shut down all background threads.
+
+ :param bool set_event:
+ Set ``shutdown_event`` and call ``renderer.shutdown`` which should
+ shut down all threads. Set it to False unless you are exiting an
+ application.
+
+ If set to False this does nothing more then resolving reference
+ cycle ``powerline → config_loader → bound methods → powerline`` by
+ unsubscribing from config_loader events.
+ '''
+ if set_event:
+ self.shutdown_event.set()
+ try:
+ self.renderer.shutdown()
+ except AttributeError:
+ pass
+ functions = tuple(self.cr_callbacks.values())
+ self.config_loader.unregister_functions(set(functions))
+ self.config_loader.unregister_missing(set(((self.find_config_files, function) for function in functions)))
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.shutdown()
+
+ def exception(self, msg, *args, **kwargs):
+ if 'prefix' not in kwargs:
+ kwargs['prefix'] = 'powerline'
+ exception = kwargs.pop('exception', None)
+ pl = getattr(self, 'pl', None) or get_fallback_logger(self.default_log_stream)
+ if exception:
+ try:
+ reraise(exception)
+ except Exception:
+ return pl.exception(msg, *args, **kwargs)
+ return pl.exception(msg, *args, **kwargs)