diff options
Diffstat (limited to 'cli_helpers')
-rw-r--r-- | cli_helpers/__init__.py | 1 | ||||
-rw-r--r-- | cli_helpers/compat.py | 56 | ||||
-rw-r--r-- | cli_helpers/config.py | 301 | ||||
-rw-r--r-- | cli_helpers/tabular_output/__init__.py | 13 | ||||
-rw-r--r-- | cli_helpers/tabular_output/delimited_output_adapter.py | 56 | ||||
-rw-r--r-- | cli_helpers/tabular_output/output_formatter.py | 255 | ||||
-rw-r--r-- | cli_helpers/tabular_output/preprocessors.py | 353 | ||||
-rw-r--r-- | cli_helpers/tabular_output/tabulate_adapter.py | 212 | ||||
-rw-r--r-- | cli_helpers/tabular_output/tsv_output_adapter.py | 17 | ||||
-rw-r--r-- | cli_helpers/tabular_output/vertical_table_adapter.py | 67 | ||||
-rw-r--r-- | cli_helpers/utils.py | 114 |
11 files changed, 1445 insertions, 0 deletions
diff --git a/cli_helpers/__init__.py b/cli_helpers/__init__.py new file mode 100644 index 0000000..55e4709 --- /dev/null +++ b/cli_helpers/__init__.py @@ -0,0 +1 @@ +__version__ = "2.3.0" diff --git a/cli_helpers/compat.py b/cli_helpers/compat.py new file mode 100644 index 0000000..422403c --- /dev/null +++ b/cli_helpers/compat.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +"""OS and Python compatibility support.""" + +from decimal import Decimal +from types import SimpleNamespace +import sys + +PY2 = sys.version_info[0] == 2 +WIN = sys.platform.startswith("win") +MAC = sys.platform == "darwin" + + +if PY2: + text_type = unicode + binary_type = str + long_type = long + int_types = (int, long) + + from UserDict import UserDict + from backports import csv + + from StringIO import StringIO + from itertools import izip_longest as zip_longest +else: + text_type = str + binary_type = bytes + long_type = int + int_types = (int,) + + from collections import UserDict + import csv + from io import StringIO + from itertools import zip_longest + + +HAS_PYGMENTS = True +try: + from pygments.token import Token + from pygments.formatters.terminal256 import Terminal256Formatter +except ImportError: + HAS_PYGMENTS = False + Terminal256Formatter = None + Token = SimpleNamespace() + Token.Output = SimpleNamespace() + Token.Output.Header = None + Token.Output.OddRow = None + Token.Output.EvenRow = None + Token.Output.Null = None + Token.Output.TableSeparator = None + Token.Results = SimpleNamespace() + Token.Results.Header = None + Token.Results.OddRow = None + Token.Results.EvenRow = None + + +float_types = (float, Decimal) diff --git a/cli_helpers/config.py b/cli_helpers/config.py new file mode 100644 index 0000000..7669717 --- /dev/null +++ b/cli_helpers/config.py @@ -0,0 +1,301 @@ +# -*- coding: utf-8 -*- +"""Read and write an application's config files.""" + +from __future__ import unicode_literals +import io +import logging +import os + +from configobj import ConfigObj, ConfigObjError +from validate import ValidateError, Validator + +from .compat import MAC, text_type, UserDict, WIN + +logger = logging.getLogger(__name__) + + +class ConfigError(Exception): + """Base class for exceptions in this module.""" + + pass + + +class DefaultConfigValidationError(ConfigError): + """Indicates the default config file did not validate correctly.""" + + pass + + +class Config(UserDict, object): + """Config reader/writer class. + + :param str app_name: The application's name. + :param str app_author: The application author/organization. + :param str filename: The config filename to look for (e.g. ``config``). + :param dict/str default: The default config values or absolute path to + config file. + :param bool validate: Whether or not to validate the config file. + :param bool write_default: Whether or not to write the default config + file to the user config directory if it doesn't + already exist. + :param tuple additional_dirs: Additional directories to check for a config + file. + """ + + def __init__( + self, + app_name, + app_author, + filename, + default=None, + validate=False, + write_default=False, + additional_dirs=(), + ): + super(Config, self).__init__() + #: The :class:`ConfigObj` instance. + self.data = ConfigObj(encoding="utf8") + + self.default = {} + self.default_file = self.default_config = None + self.config_filenames = [] + + self.app_name, self.app_author = app_name, app_author + self.filename = filename + self.write_default = write_default + self.validate = validate + self.additional_dirs = additional_dirs + + if isinstance(default, dict): + self.default = default + self.update(default) + elif isinstance(default, text_type): + self.default_file = default + elif default is not None: + raise TypeError( + '"default" must be a dict or {}, not {}'.format( + text_type.__name__, type(default) + ) + ) + + if self.write_default and not self.default_file: + raise ValueError( + 'Cannot use "write_default" without specifying ' "a default file." + ) + + if self.validate and not self.default_file: + raise ValueError( + 'Cannot use "validate" without specifying a ' "default file." + ) + + def read_default_config(self): + """Read the default config file. + + :raises DefaultConfigValidationError: There was a validation error with + the *default* file. + """ + if self.validate: + self.default_config = ConfigObj( + configspec=self.default_file, + list_values=False, + _inspec=True, + encoding="utf8", + ) + # ConfigObj does not set the encoding on the configspec. + self.default_config.configspec.encoding = "utf8" + + valid = self.default_config.validate( + Validator(), copy=True, preserve_errors=True + ) + if valid is not True: + for name, section in valid.items(): + if section is True: + continue + for key, value in section.items(): + if isinstance(value, ValidateError): + raise DefaultConfigValidationError( + 'section [{}], key "{}": {}'.format(name, key, value) + ) + elif self.default_file: + self.default_config, _ = self.read_config_file(self.default_file) + + self.update(self.default_config) + + def read(self): + """Read the default, additional, system, and user config files. + + :raises DefaultConfigValidationError: There was a validation error with + the *default* file. + """ + if self.default_file: + self.read_default_config() + return self.read_config_files(self.all_config_files()) + + def user_config_file(self): + """Get the absolute path to the user config file.""" + return os.path.join( + get_user_config_dir(self.app_name, self.app_author), self.filename + ) + + def system_config_files(self): + """Get a list of absolute paths to the system config files.""" + return [ + os.path.join(f, self.filename) + for f in get_system_config_dirs(self.app_name, self.app_author) + ] + + def additional_files(self): + """Get a list of absolute paths to the additional config files.""" + return [os.path.join(f, self.filename) for f in self.additional_dirs] + + def all_config_files(self): + """Get a list of absolute paths to all the config files.""" + return ( + self.additional_files() + + self.system_config_files() + + [self.user_config_file()] + ) + + def write_default_config(self, overwrite=False): + """Write the default config to the user's config file. + + :param bool overwrite: Write over an existing config if it exists. + """ + destination = self.user_config_file() + if not overwrite and os.path.exists(destination): + return + + with io.open(destination, mode="wb") as f: + self.default_config.write(f) + + def write(self, outfile=None, section=None): + """Write the current config to a file (defaults to user config). + + :param str outfile: The path to the file to write to. + :param None/str section: The config section to write, or :data:`None` + to write the entire config. + """ + with io.open(outfile or self.user_config_file(), "wb") as f: + self.data.write(outfile=f, section=section) + + def read_config_file(self, f): + """Read a config file *f*. + + :param str f: The path to a file to read. + """ + configspec = self.default_file if self.validate else None + try: + config = ConfigObj( + infile=f, configspec=configspec, interpolation=False, encoding="utf8" + ) + # ConfigObj does not set the encoding on the configspec. + if config.configspec is not None: + config.configspec.encoding = "utf8" + except ConfigObjError as e: + logger.warning( + "Unable to parse line {} of config file {}".format(e.line_number, f) + ) + config = e.config + + valid = True + if self.validate: + valid = config.validate(Validator(), preserve_errors=True, copy=True) + if bool(config): + self.config_filenames.append(config.filename) + + return config, valid + + def read_config_files(self, files): + """Read a list of config files. + + :param iterable files: An iterable (e.g. list) of files to read. + """ + errors = {} + for _file in files: + config, valid = self.read_config_file(_file) + self.update(config) + if valid is not True: + errors[_file] = valid + return errors or True + + +def get_user_config_dir(app_name, app_author, roaming=True, force_xdg=True): + """Returns the config folder for the application. The default behavior + is to return whatever is most appropriate for the operating system. + + For an example application called ``"My App"`` by ``"Acme"``, + something like the following folders could be returned: + + macOS (non-XDG): + ``~/Library/Application Support/My App`` + Mac OS X (XDG): + ``~/.config/my-app`` + Unix: + ``~/.config/my-app`` + Windows 7 (roaming): + ``C:\\Users\\<user>\\AppData\\Roaming\\Acme\\My App`` + Windows 7 (not roaming): + ``C:\\Users\\<user>\\AppData\\Local\\Acme\\My App`` + + :param app_name: the application name. This should be properly capitalized + and can contain whitespace. + :param app_author: The app author's name (or company). This should be + properly capitalized and can contain whitespace. + :param roaming: controls if the folder should be roaming or not on Windows. + Has no effect on non-Windows systems. + :param force_xdg: if this is set to `True`, then on macOS the XDG Base + Directory Specification will be followed. Has no effect + on non-macOS systems. + + """ + if WIN: + key = "APPDATA" if roaming else "LOCALAPPDATA" + folder = os.path.expanduser(os.environ.get(key, "~")) + return os.path.join(folder, app_author, app_name) + if MAC and not force_xdg: + return os.path.join( + os.path.expanduser("~/Library/Application Support"), app_name + ) + return os.path.join( + os.path.expanduser(os.environ.get("XDG_CONFIG_HOME", "~/.config")), + _pathify(app_name), + ) + + +def get_system_config_dirs(app_name, app_author, force_xdg=True): + r"""Returns a list of system-wide config folders for the application. + + For an example application called ``"My App"`` by ``"Acme"``, + something like the following folders could be returned: + + macOS (non-XDG): + ``['/Library/Application Support/My App']`` + Mac OS X (XDG): + ``['/etc/xdg/my-app']`` + Unix: + ``['/etc/xdg/my-app']`` + Windows 7: + ``['C:\ProgramData\Acme\My App']`` + + :param app_name: the application name. This should be properly capitalized + and can contain whitespace. + :param app_author: The app author's name (or company). This should be + properly capitalized and can contain whitespace. + :param force_xdg: if this is set to `True`, then on macOS the XDG Base + Directory Specification will be followed. Has no effect + on non-macOS systems. + + """ + if WIN: + folder = os.environ.get("PROGRAMDATA") + return [os.path.join(folder, app_author, app_name)] + if MAC and not force_xdg: + return [os.path.join("/Library/Application Support", app_name)] + dirs = os.environ.get("XDG_CONFIG_DIRS", "/etc/xdg") + paths = [os.path.expanduser(x) for x in dirs.split(os.pathsep)] + return [os.path.join(d, _pathify(app_name)) for d in paths] + + +def _pathify(s): + """Convert spaces to hyphens and lowercase a string.""" + return "-".join(s.split()).lower() diff --git a/cli_helpers/tabular_output/__init__.py b/cli_helpers/tabular_output/__init__.py new file mode 100644 index 0000000..e4247bd --- /dev/null +++ b/cli_helpers/tabular_output/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +"""CLI Helper's tabular output module makes it easy to format your data using +various formatting libraries. + +When formatting data, you'll primarily use the +:func:`~cli_helpers.tabular_output.format_output` function and +:class:`~cli_helpers.tabular_output.TabularOutputFormatter` class. + +""" + +from .output_formatter import format_output, TabularOutputFormatter + +__all__ = ["format_output", "TabularOutputFormatter"] diff --git a/cli_helpers/tabular_output/delimited_output_adapter.py b/cli_helpers/tabular_output/delimited_output_adapter.py new file mode 100644 index 0000000..b812456 --- /dev/null +++ b/cli_helpers/tabular_output/delimited_output_adapter.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +"""A delimited data output adapter (e.g. CSV, TSV).""" + +from __future__ import unicode_literals +import contextlib + +from cli_helpers.compat import csv, StringIO +from cli_helpers.utils import filter_dict_by_key +from .preprocessors import bytes_to_string, override_missing_value + +supported_formats = ("csv", "csv-tab") +preprocessors = (override_missing_value, bytes_to_string) + + +class linewriter(object): + def __init__(self): + self.reset() + + def reset(self): + self.line = None + + def write(self, d): + self.line = d + + +def adapter(data, headers, table_format="csv", **kwargs): + """Wrap the formatting inside a function for TabularOutputFormatter.""" + keys = ( + "dialect", + "delimiter", + "doublequote", + "escapechar", + "quotechar", + "quoting", + "skipinitialspace", + "strict", + ) + if table_format == "csv": + delimiter = "," + elif table_format == "csv-tab": + delimiter = "\t" + else: + raise ValueError("Invalid table_format specified.") + + ckwargs = {"delimiter": delimiter, "lineterminator": ""} + ckwargs.update(filter_dict_by_key(kwargs, keys)) + + l = linewriter() + writer = csv.writer(l, **ckwargs) + writer.writerow(headers) + yield l.line + + for row in data: + l.reset() + writer.writerow(row) + yield l.line diff --git a/cli_helpers/tabular_output/output_formatter.py b/cli_helpers/tabular_output/output_formatter.py new file mode 100644 index 0000000..6cadf6c --- /dev/null +++ b/cli_helpers/tabular_output/output_formatter.py @@ -0,0 +1,255 @@ +# -*- coding: utf-8 -*- +"""A generic tabular data output formatter interface.""" + +from __future__ import unicode_literals +from collections import namedtuple + +from cli_helpers.compat import ( + text_type, + binary_type, + int_types, + float_types, + zip_longest, +) +from cli_helpers.utils import unique_items +from . import ( + delimited_output_adapter, + vertical_table_adapter, + tabulate_adapter, + tsv_output_adapter, +) +from decimal import Decimal + +import itertools + +MISSING_VALUE = "<null>" +MAX_FIELD_WIDTH = 500 + +TYPES = { + type(None): 0, + bool: 1, + int: 2, + float: 3, + Decimal: 3, + binary_type: 4, + text_type: 5, +} + +OutputFormatHandler = namedtuple( + "OutputFormatHandler", "format_name preprocessors formatter formatter_args" +) + + +class TabularOutputFormatter(object): + """An interface to various tabular data formatting libraries. + + The formatting libraries supported include: + - `tabulate <https://bitbucket.org/astanin/python-tabulate>`_ + - `terminaltables <https://robpol86.github.io/terminaltables/>`_ + - a CLI Helper vertical table layout + - delimited formats (CSV and TSV) + + :param str format_name: An optional, default format name. + + Usage:: + + >>> from cli_helpers.tabular_output import TabularOutputFormatter + >>> formatter = TabularOutputFormatter(format_name='simple') + >>> data = ((1, 87), (2, 80), (3, 79)) + >>> headers = ('day', 'temperature') + >>> print(formatter.format_output(data, headers)) + day temperature + ----- ------------- + 1 87 + 2 80 + 3 79 + + You can use any :term:`iterable` for the data or headers:: + + >>> data = enumerate(('87', '80', '79'), 1) + >>> print(formatter.format_output(data, headers)) + day temperature + ----- ------------- + 1 87 + 2 80 + 3 79 + + """ + + _output_formats = {} + + def __init__(self, format_name=None): + """Set the default *format_name*.""" + self._format_name = None + + if format_name: + self.format_name = format_name + + @property + def format_name(self): + """The current format name. + + This value must be in :data:`supported_formats`. + + """ + return self._format_name + + @format_name.setter + def format_name(self, format_name): + """Set the default format name. + + :param str format_name: The display format name. + :raises ValueError: if the format is not recognized. + + """ + if format_name in self.supported_formats: + self._format_name = format_name + else: + raise ValueError('unrecognized format_name "{}"'.format(format_name)) + + @property + def supported_formats(self): + """The names of the supported output formats in a :class:`tuple`.""" + return tuple(self._output_formats.keys()) + + @classmethod + def register_new_formatter( + cls, format_name, handler, preprocessors=(), kwargs=None + ): + """Register a new output formatter. + + :param str format_name: The name of the format. + :param callable handler: The function that formats the data. + :param tuple preprocessors: The preprocessors to call before + formatting. + :param dict kwargs: Keys/values for keyword argument defaults. + + """ + cls._output_formats[format_name] = OutputFormatHandler( + format_name, preprocessors, handler, kwargs or {} + ) + + def format_output( + self, + data, + headers, + format_name=None, + preprocessors=(), + column_types=None, + **kwargs + ): + r"""Format the headers and data using a specific formatter. + + *format_name* must be a supported formatter (see + :attr:`supported_formats`). + + :param iterable data: An :term:`iterable` (e.g. list) of rows. + :param iterable headers: The column headers. + :param str format_name: The display format to use (optional, if the + :class:`TabularOutputFormatter` object has a default format set). + :param tuple preprocessors: Additional preprocessors to call before + any formatter preprocessors. + :param \*\*kwargs: Optional arguments for the formatter. + :return: The formatted data. + :rtype: str + :raises ValueError: If the *format_name* is not recognized. + + """ + format_name = format_name or self._format_name + if format_name not in self.supported_formats: + raise ValueError('unrecognized format "{}"'.format(format_name)) + + (_, _preprocessors, formatter, fkwargs) = self._output_formats[format_name] + fkwargs.update(kwargs) + if column_types is None: + data = list(data) + column_types = self._get_column_types(data) + for f in unique_items(preprocessors + _preprocessors): + data, headers = f(data, headers, column_types=column_types, **fkwargs) + return formatter(list(data), headers, column_types=column_types, **fkwargs) + + def _get_column_types(self, data): + """Get a list of the data types for each column in *data*.""" + columns = list(zip_longest(*data)) + return [self._get_column_type(column) for column in columns] + + def _get_column_type(self, column): + """Get the most generic data type for iterable *column*.""" + type_values = [TYPES[self._get_type(v)] for v in column] + inverse_types = {v: k for k, v in TYPES.items()} + return inverse_types[max(type_values)] + + def _get_type(self, value): + """Get the data type for *value*.""" + if value is None: + return type(None) + elif type(value) in int_types: + return int + elif type(value) in float_types: + return float + elif isinstance(value, binary_type): + return binary_type + else: + return text_type + + +def format_output(data, headers, format_name, **kwargs): + r"""Format output using *format_name*. + + This is a wrapper around the :class:`TabularOutputFormatter` class. + + :param iterable data: An :term:`iterable` (e.g. list) of rows. + :param iterable headers: The column headers. + :param str format_name: The display format to use. + :param \*\*kwargs: Optional arguments for the formatter. + :return: The formatted data. + :rtype: str + + """ + formatter = TabularOutputFormatter(format_name=format_name) + return formatter.format_output(data, headers, **kwargs) + + +for vertical_format in vertical_table_adapter.supported_formats: + TabularOutputFormatter.register_new_formatter( + vertical_format, + vertical_table_adapter.adapter, + vertical_table_adapter.preprocessors, + { + "table_format": vertical_format, + "missing_value": MISSING_VALUE, + "max_field_width": None, + }, + ) + +for delimited_format in delimited_output_adapter.supported_formats: + TabularOutputFormatter.register_new_formatter( + delimited_format, + delimited_output_adapter.adapter, + delimited_output_adapter.preprocessors, + { + "table_format": delimited_format, + "missing_value": "", + "max_field_width": None, + }, + ) + +for tabulate_format in tabulate_adapter.supported_formats: + TabularOutputFormatter.register_new_formatter( + tabulate_format, + tabulate_adapter.adapter, + tabulate_adapter.get_preprocessors(tabulate_format), + { + "table_format": tabulate_format, + "missing_value": MISSING_VALUE, + "max_field_width": MAX_FIELD_WIDTH, + }, + ), + +for tsv_format in tsv_output_adapter.supported_formats: + TabularOutputFormatter.register_new_formatter( + tsv_format, + tsv_output_adapter.adapter, + tsv_output_adapter.preprocessors, + {"table_format": tsv_format, "missing_value": "", "max_field_width": None}, + ) diff --git a/cli_helpers/tabular_output/preprocessors.py b/cli_helpers/tabular_output/preprocessors.py new file mode 100644 index 0000000..8342d67 --- /dev/null +++ b/cli_helpers/tabular_output/preprocessors.py @@ -0,0 +1,353 @@ +# -*- coding: utf-8 -*- +"""These preprocessor functions are used to process data prior to output.""" + +import string + +from cli_helpers import utils +from cli_helpers.compat import text_type, int_types, float_types, HAS_PYGMENTS, Token + + +def truncate_string( + data, headers, max_field_width=None, skip_multiline_string=True, **_ +): + """Truncate very long strings. Only needed for tabular + representation, because trying to tabulate very long data + is problematic in terms of performance, and does not make any + sense visually. + + :param iterable data: An :term:`iterable` (e.g. list) of rows. + :param iterable headers: The column headers. + :param int max_field_width: Width to truncate field for display + :return: The processed data and headers. + :rtype: tuple + """ + return ( + ( + [ + utils.truncate_string(v, max_field_width, skip_multiline_string) + for v in row + ] + for row in data + ), + [ + utils.truncate_string(h, max_field_width, skip_multiline_string) + for h in headers + ], + ) + + +def convert_to_string(data, headers, **_): + """Convert all *data* and *headers* to strings. + + Binary data that cannot be decoded is converted to a hexadecimal + representation via :func:`binascii.hexlify`. + + :param iterable data: An :term:`iterable` (e.g. list) of rows. + :param iterable headers: The column headers. + :return: The processed data and headers. + :rtype: tuple + + """ + return ( + ([utils.to_string(v) for v in row] for row in data), + [utils.to_string(h) for h in headers], + ) + + +def override_missing_value( + data, + headers, + style=None, + missing_value_token=Token.Output.Null, + missing_value="", + **_, +): + """Override missing values in the *data* with *missing_value*. + + A missing value is any value that is :data:`None`. + + :param iterable data: An :term:`iterable` (e.g. list) of rows. + :param iterable headers: The column headers. + :param style: Style for missing_value. + :param missing_value_token: The Pygments token used for missing data. + :param missing_value: The default value to use for missing data. + :return: The processed data and headers. + :rtype: tuple + + """ + + def fields(): + for row in data: + processed = [] + for field in row: + if field is None and style and HAS_PYGMENTS: + styled = utils.style_field( + missing_value_token, missing_value, style + ) + processed.append(styled) + elif field is None: + processed.append(missing_value) + else: + processed.append(field) + yield processed + + return (fields(), headers) + + +def override_tab_value(data, headers, new_value=" ", **_): + """Override tab values in the *data* with *new_value*. + + :param iterable data: An :term:`iterable` (e.g. list) of rows. + :param iterable headers: The column headers. + :param new_value: The new value to use for tab. + :return: The processed data and headers. + :rtype: tuple + + """ + return ( + ( + [v.replace("\t", new_value) if isinstance(v, text_type) else v for v in row] + for row in data + ), + headers, + ) + + +def escape_newlines(data, headers, **_): + """Escape newline characters (\n -> \\n, \r -> \\r) + + :param iterable data: An :term:`iterable` (e.g. list) of rows. + :param iterable headers: The column headers. + :return: The processed data and headers. + :rtype: tuple + + """ + return ( + ( + [ + v.replace("\r", r"\r").replace("\n", r"\n") + if isinstance(v, text_type) + else v + for v in row + ] + for row in data + ), + headers, + ) + + +def bytes_to_string(data, headers, **_): + """Convert all *data* and *headers* bytes to strings. + + Binary data that cannot be decoded is converted to a hexadecimal + representation via :func:`binascii.hexlify`. + + :param iterable data: An :term:`iterable` (e.g. list) of rows. + :param iterable headers: The column headers. + :return: The processed data and headers. + :rtype: tuple + + """ + return ( + ([utils.bytes_to_string(v) for v in row] for row in data), + [utils.bytes_to_string(h) for h in headers], + ) + + +def align_decimals(data, headers, column_types=(), **_): + """Align numbers in *data* on their decimal points. + + Whitespace padding is added before a number so that all numbers in a + column are aligned. + + Outputting data before aligning the decimals:: + + 1 + 2.1 + 10.59 + + Outputting data after aligning the decimals:: + + 1 + 2.1 + 10.59 + + :param iterable data: An :term:`iterable` (e.g. list) of rows. + :param iterable headers: The column headers. + :param iterable column_types: The columns' type objects (e.g. int or float). + :return: The processed data and headers. + :rtype: tuple + + """ + pointpos = len(headers) * [0] + data = list(data) + for row in data: + for i, v in enumerate(row): + if column_types[i] is float and type(v) in float_types: + v = text_type(v) + pointpos[i] = max(utils.intlen(v), pointpos[i]) + + def results(data): + for row in data: + result = [] + for i, v in enumerate(row): + if column_types[i] is float and type(v) in float_types: + v = text_type(v) + result.append((pointpos[i] - utils.intlen(v)) * " " + v) + else: + result.append(v) + yield result + + return results(data), headers + + +def quote_whitespaces(data, headers, quotestyle="'", **_): + """Quote leading/trailing whitespace in *data*. + + When outputing data with leading or trailing whitespace, it can be useful + to put quotation marks around the value so the whitespace is more + apparent. If one value in a column needs quoted, then all values in that + column are quoted to keep things consistent. + + .. NOTE:: + :data:`string.whitespace` is used to determine which characters are + whitespace. + + :param iterable data: An :term:`iterable` (e.g. list) of rows. + :param iterable headers: The column headers. + :param str quotestyle: The quotation mark to use (defaults to ``'``). + :return: The processed data and headers. + :rtype: tuple + + """ + whitespace = tuple(string.whitespace) + quote = len(headers) * [False] + data = list(data) + for row in data: + for i, v in enumerate(row): + v = text_type(v) + if v.startswith(whitespace) or v.endswith(whitespace): + quote[i] = True + + def results(data): + for row in data: + result = [] + for i, v in enumerate(row): + quotation = quotestyle if quote[i] else "" + result.append( + "{quotestyle}{value}{quotestyle}".format( + quotestyle=quotation, value=v + ) + ) + yield result + + return results(data), headers + + +def style_output( + data, + headers, + style=None, + header_token=Token.Output.Header, + odd_row_token=Token.Output.OddRow, + even_row_token=Token.Output.EvenRow, + **_, +): + """Style the *data* and *headers* (e.g. bold, italic, and colors) + + .. NOTE:: + This requires the `Pygments <http://pygments.org/>`_ library to + be installed. You can install it with CLI Helpers as an extra:: + $ pip install cli_helpers[styles] + + Example usage:: + + from cli_helpers.tabular_output.preprocessors import style_output + from pygments.style import Style + from pygments.token import Token + + class YourStyle(Style): + default_style = "" + styles = { + Token.Output.Header: 'bold ansibrightred', + Token.Output.OddRow: 'bg:#eee #111', + Token.Output.EvenRow: '#0f0' + } + + headers = ('First Name', 'Last Name') + data = [['Fred', 'Roberts'], ['George', 'Smith']] + + data, headers = style_output(data, headers, style=YourStyle) + + :param iterable data: An :term:`iterable` (e.g. list) of rows. + :param iterable headers: The column headers. + :param str/pygments.style.Style style: A Pygments style. You can `create + your own styles <https://pygments.org/docs/styles#creating-own-styles>`_. + :param str header_token: The token type to be used for the headers. + :param str odd_row_token: The token type to be used for odd rows. + :param str even_row_token: The token type to be used for even rows. + :return: The styled data and headers. + :rtype: tuple + + """ + from cli_helpers.utils import filter_style_table + + relevant_styles = filter_style_table( + style, header_token, odd_row_token, even_row_token + ) + if style and HAS_PYGMENTS: + if relevant_styles.get(header_token): + headers = [ + utils.style_field(header_token, header, style) for header in headers + ] + if relevant_styles.get(odd_row_token) or relevant_styles.get(even_row_token): + data = ( + [ + utils.style_field( + odd_row_token if i % 2 else even_row_token, f, style + ) + for f in r + ] + for i, r in enumerate(data, 1) + ) + + return iter(data), headers + + +def format_numbers( + data, headers, column_types=(), integer_format=None, float_format=None, **_ +): + """Format numbers according to a format specification. + + This uses Python's format specification to format numbers of the following + types: :class:`int`, :class:`py2:long` (Python 2), :class:`float`, and + :class:`~decimal.Decimal`. See the :ref:`python:formatspec` for more + information about the format strings. + + .. NOTE:: + A column is only formatted if all of its values are the same type + (except for :data:`None`). + + :param iterable data: An :term:`iterable` (e.g. list) of rows. + :param iterable headers: The column headers. + :param iterable column_types: The columns' type objects (e.g. int or float). + :param str integer_format: The format string to use for integer columns. + :param str float_format: The format string to use for float columns. + :return: The processed data and headers. + :rtype: tuple + + """ + if (integer_format is None and float_format is None) or not column_types: + return iter(data), headers + + def _format_number(field, column_type): + if integer_format and column_type is int and type(field) in int_types: + return format(field, integer_format) + elif float_format and column_type is float and type(field) in float_types: + return format(field, float_format) + return field + + data = ( + [_format_number(v, column_types[i]) for i, v in enumerate(row)] for row in data + ) + return data, headers diff --git a/cli_helpers/tabular_output/tabulate_adapter.py b/cli_helpers/tabular_output/tabulate_adapter.py new file mode 100644 index 0000000..2c557f8 --- /dev/null +++ b/cli_helpers/tabular_output/tabulate_adapter.py @@ -0,0 +1,212 @@ +# -*- coding: utf-8 -*- +"""Format adapter for the tabulate module.""" + +from __future__ import unicode_literals + +from cli_helpers.utils import filter_dict_by_key +from cli_helpers.compat import Terminal256Formatter, Token, StringIO +from .preprocessors import ( + convert_to_string, + truncate_string, + override_missing_value, + style_output, + HAS_PYGMENTS, + escape_newlines, +) + +import tabulate + + +tabulate.MIN_PADDING = 0 + +tabulate._table_formats["psql_unicode"] = tabulate.TableFormat( + lineabove=tabulate.Line("┌", "─", "┬", "┐"), + linebelowheader=tabulate.Line("├", "─", "┼", "┤"), + linebetweenrows=None, + linebelow=tabulate.Line("└", "─", "┴", "┘"), + headerrow=tabulate.DataRow("│", "│", "│"), + datarow=tabulate.DataRow("│", "│", "│"), + padding=1, + with_header_hide=None, +) + +tabulate._table_formats["double"] = tabulate.TableFormat( + lineabove=tabulate.Line("╔", "═", "╦", "╗"), + linebelowheader=tabulate.Line("╠", "═", "╬", "╣"), + linebetweenrows=None, + linebelow=tabulate.Line("╚", "═", "╩", "╝"), + headerrow=tabulate.DataRow("║", "║", "║"), + datarow=tabulate.DataRow("║", "║", "║"), + padding=1, + with_header_hide=None, +) + +tabulate._table_formats["ascii"] = tabulate.TableFormat( + lineabove=tabulate.Line("+", "-", "+", "+"), + linebelowheader=tabulate.Line("+", "-", "+", "+"), + linebetweenrows=None, + linebelow=tabulate.Line("+", "-", "+", "+"), + headerrow=tabulate.DataRow("|", "|", "|"), + datarow=tabulate.DataRow("|", "|", "|"), + padding=1, + with_header_hide=None, +) + +tabulate._table_formats["ascii_escaped"] = tabulate.TableFormat( + lineabove=tabulate.Line("+", "-", "+", "+"), + linebelowheader=tabulate.Line("+", "-", "+", "+"), + linebetweenrows=None, + linebelow=tabulate.Line("+", "-", "+", "+"), + headerrow=tabulate.DataRow("|", "|", "|"), + datarow=tabulate.DataRow("|", "|", "|"), + padding=1, + with_header_hide=None, +) + +# "minimal" is the same as "plain", but without headers +tabulate._table_formats["minimal"] = tabulate._table_formats["plain"] + +tabulate.multiline_formats["psql_unicode"] = "psql_unicode" +tabulate.multiline_formats["double"] = "double" +tabulate.multiline_formats["ascii"] = "ascii" +tabulate.multiline_formats["minimal"] = "minimal" + +supported_markup_formats = ( + "mediawiki", + "html", + "latex", + "latex_booktabs", + "textile", + "moinmoin", + "jira", +) +supported_table_formats = ( + "ascii", + "ascii_escaped", + "plain", + "simple", + "minimal", + "grid", + "fancy_grid", + "pipe", + "orgtbl", + "psql", + "psql_unicode", + "rst", + "github", + "double", +) + +supported_formats = supported_markup_formats + supported_table_formats + +default_kwargs = { + "ascii": {"numalign": "left"}, + "ascii_escaped": {"numalign": "left"}, +} +headless_formats = ("minimal",) + + +def get_preprocessors(format_name): + common_formatters = ( + override_missing_value, + convert_to_string, + truncate_string, + style_output, + ) + + if tabulate.multiline_formats.get(format_name): + return common_formatters + (style_output_table(format_name),) + else: + return common_formatters + (escape_newlines, style_output_table(format_name)) + + +def style_output_table(format_name=""): + def style_output( + data, + headers, + style=None, + table_separator_token=Token.Output.TableSeparator, + **_, + ): + """Style the *table* a(e.g. bold, italic, and colors) + + .. NOTE:: + This requires the `Pygments <http://pygments.org/>`_ library to + be installed. You can install it with CLI Helpers as an extra:: + $ pip install cli_helpers[styles] + + Example usage:: + + from cli_helpers.tabular_output import tabulate_adapter + from pygments.style import Style + from pygments.token import Token + + class YourStyle(Style): + default_style = "" + styles = { + Token.Output.TableSeparator: '#ansigray' + } + + headers = ('First Name', 'Last Name') + data = [['Fred', 'Roberts'], ['George', 'Smith']] + style_output_table = tabulate_adapter.style_output_table('psql') + style_output_table(data, headers, style=CliStyle) + + data, headers = style_output(data, headers, style=YourStyle) + output = tabulate_adapter.adapter(data, headers, style=YourStyle) + + :param iterable data: An :term:`iterable` (e.g. list) of rows. + :param iterable headers: The column headers. + :param str/pygments.style.Style style: A Pygments style. You can `create + your own styles <https://pygments.org/docs/styles#creating-own-styles>`_. + :param str table_separator_token: The token type to be used for the table separator. + :return: data and headers. + :rtype: tuple + + """ + if style and HAS_PYGMENTS and format_name in supported_table_formats: + formatter = Terminal256Formatter(style=style) + + def style_field(token, field): + """Get the styled text for a *field* using *token* type.""" + s = StringIO() + formatter.format(((token, field),), s) + return s.getvalue() + + def addColorInElt(elt): + if not elt: + return elt + if elt.__class__ == tabulate.Line: + return tabulate.Line( + *(style_field(table_separator_token, val) for val in elt) + ) + if elt.__class__ == tabulate.DataRow: + return tabulate.DataRow( + *(style_field(table_separator_token, val) for val in elt) + ) + return elt + + srcfmt = tabulate._table_formats[format_name] + newfmt = tabulate.TableFormat(*(addColorInElt(val) for val in srcfmt)) + tabulate._table_formats[format_name] = newfmt + + return iter(data), headers + + return style_output + + +def adapter(data, headers, table_format=None, preserve_whitespace=False, **kwargs): + """Wrap tabulate inside a function for TabularOutputFormatter.""" + keys = ("floatfmt", "numalign", "stralign", "showindex", "disable_numparse") + tkwargs = {"tablefmt": table_format} + tkwargs.update(filter_dict_by_key(kwargs, keys)) + + if table_format in supported_markup_formats: + tkwargs.update(numalign=None, stralign=None) + + tabulate.PRESERVE_WHITESPACE = preserve_whitespace + + tkwargs.update(default_kwargs.get(table_format, {})) + if table_format in headless_formats: + headers = [] + return iter(tabulate.tabulate(data, headers, **tkwargs).split("\n")) diff --git a/cli_helpers/tabular_output/tsv_output_adapter.py b/cli_helpers/tabular_output/tsv_output_adapter.py new file mode 100644 index 0000000..75518b3 --- /dev/null +++ b/cli_helpers/tabular_output/tsv_output_adapter.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- +"""A tsv data output adapter""" + +from __future__ import unicode_literals + +from .preprocessors import bytes_to_string, override_missing_value, convert_to_string +from itertools import chain +from cli_helpers.utils import replace + +supported_formats = ("tsv",) +preprocessors = (override_missing_value, bytes_to_string, convert_to_string) + + +def adapter(data, headers, **kwargs): + """Wrap the formatting inside a function for TabularOutputFormatter.""" + for row in chain((headers,), data): + yield "\t".join((replace(r, (("\n", r"\n"), ("\t", r"\t"))) for r in row)) diff --git a/cli_helpers/tabular_output/vertical_table_adapter.py b/cli_helpers/tabular_output/vertical_table_adapter.py new file mode 100644 index 0000000..0b96cb2 --- /dev/null +++ b/cli_helpers/tabular_output/vertical_table_adapter.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +"""Format data into a vertical table layout.""" + +from __future__ import unicode_literals + +from cli_helpers.utils import filter_dict_by_key +from .preprocessors import convert_to_string, override_missing_value, style_output + +supported_formats = ("vertical",) +preprocessors = (override_missing_value, convert_to_string, style_output) + + +def _get_separator(num, sep_title, sep_character, sep_length): + """Get a row separator for row *num*.""" + left_divider_length = right_divider_length = sep_length + if isinstance(sep_length, tuple): + left_divider_length, right_divider_length = sep_length + left_divider = sep_character * left_divider_length + right_divider = sep_character * right_divider_length + title = sep_title.format(n=num + 1) + + return "{left_divider}[ {title} ]{right_divider}\n".format( + left_divider=left_divider, right_divider=right_divider, title=title + ) + + +def _format_row(headers, row): + """Format a row.""" + formatted_row = [" | ".join(field) for field in zip(headers, row)] + return "\n".join(formatted_row) + + +def vertical_table( + data, headers, sep_title="{n}. row", sep_character="*", sep_length=27 +): + """Format *data* and *headers* as an vertical table. + + The values in *data* and *headers* must be strings. + + :param iterable data: An :term:`iterable` (e.g. list) of rows. + :param iterable headers: The column headers. + :param str sep_title: The title given to each row separator. Defaults to + ``'{n}. row'``. Any instance of ``'{n}'`` is + replaced by the record number. + :param str sep_character: The character used to separate rows. Defaults to + ``'*'``. + :param int/tuple sep_length: The number of separator characters that should + appear on each side of the *sep_title*. Use + a tuple to specify the left and right values + separately. + :return: The formatted data. + :rtype: str + + """ + header_len = max([len(x) for x in headers]) + padded_headers = [x.ljust(header_len) for x in headers] + formatted_rows = [_format_row(padded_headers, row) for row in data] + + output = [] + for i, result in enumerate(formatted_rows): + yield _get_separator(i, sep_title, sep_character, sep_length) + result + + +def adapter(data, headers, **kwargs): + """Wrap vertical table in a function for TabularOutputFormatter.""" + keys = ("sep_title", "sep_character", "sep_length") + return vertical_table(data, headers, **filter_dict_by_key(kwargs, keys)) diff --git a/cli_helpers/utils.py b/cli_helpers/utils.py new file mode 100644 index 0000000..053bdea --- /dev/null +++ b/cli_helpers/utils.py @@ -0,0 +1,114 @@ +# -*- coding: utf-8 -*- +"""Various utility functions and helpers.""" + +import binascii +import re +from functools import lru_cache +from typing import Dict + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from pygments.style import StyleMeta + +from cli_helpers.compat import binary_type, text_type, Terminal256Formatter, StringIO + + +def bytes_to_string(b): + """Convert bytes *b* to a string. + + Hexlify bytes that can't be decoded. + + """ + if isinstance(b, binary_type): + needs_hex = False + try: + result = b.decode("utf8") + needs_hex = not result.isprintable() + except UnicodeDecodeError: + needs_hex = True + if needs_hex: + return "0x" + binascii.hexlify(b).decode("ascii") + else: + return result + return b + + +def to_string(value): + """Convert *value* to a string.""" + if isinstance(value, binary_type): + return bytes_to_string(value) + else: + return text_type(value) + + +def truncate_string(value, max_width=None, skip_multiline_string=True): + """Truncate string values.""" + if skip_multiline_string and isinstance(value, text_type) and "\n" in value: + return value + elif ( + isinstance(value, text_type) + and max_width is not None + and len(value) > max_width + ): + return value[: max_width - 3] + "..." + return value + + +def intlen(n): + """Find the length of the integer part of a number *n*.""" + pos = n.find(".") + return len(n) if pos < 0 else pos + + +def filter_dict_by_key(d, keys): + """Filter the dict *d* to remove keys not in *keys*.""" + return {k: v for k, v in d.items() if k in keys} + + +def unique_items(seq): + """Return the unique items from iterable *seq* (in order).""" + seen = set() + return [x for x in seq if not (x in seen or seen.add(x))] + + +_ansi_re = re.compile("\033\\[((?:\\d|;)*)([a-zA-Z])") + + +def strip_ansi(value): + """Strip the ANSI escape sequences from a string.""" + return _ansi_re.sub("", value) + + +def replace(s, replace): + """Replace multiple values in a string""" + for r in replace: + s = s.replace(*r) + return s + + +@lru_cache() +def _get_formatter(style) -> Terminal256Formatter: + return Terminal256Formatter(style=style) + + +def style_field(token, field, style): + """Get the styled text for a *field* using *token* type.""" + formatter = _get_formatter(style) + s = StringIO() + formatter.format(((token, field),), s) + return s.getvalue() + + +def filter_style_table(style: "StyleMeta", *relevant_styles: str) -> Dict: + """ + get a dictionary of styles for given tokens. Typical usage: + + filter_style_table(style, Token.Output.EvenRow, Token.Output.OddRow) == { + Token.Output.EvenRow: "", + Token.Output.OddRow: "", + } + """ + _styles_iter = ((key, val) for key, val in getattr(style, "styles", {}).items()) + _relevant_styles_iter = filter(lambda tpl: tpl[0] in relevant_styles, _styles_iter) + return {key: val for key, val in _relevant_styles_iter} |