diff options
-rw-r--r-- | .coveragerc | 33 | ||||
-rw-r--r-- | .gitignore | 6 | ||||
-rw-r--r-- | .pre-commit-config.yaml | 38 | ||||
-rw-r--r-- | LICENSE | 19 | ||||
-rw-r--r-- | README.md | 282 | ||||
-rw-r--r-- | azure-pipelines.yml | 20 | ||||
-rw-r--r-- | cfgv.py | 408 | ||||
-rw-r--r-- | requirements-dev.txt | 3 | ||||
-rw-r--r-- | setup.cfg | 27 | ||||
-rw-r--r-- | setup.py | 2 | ||||
-rw-r--r-- | tests/__init__.py | 0 | ||||
-rw-r--r-- | tests/cfgv_test.py | 680 | ||||
-rw-r--r-- | tox.ini | 18 |
13 files changed, 1536 insertions, 0 deletions
diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..0bada6e --- /dev/null +++ b/.coveragerc @@ -0,0 +1,33 @@ +[run] +branch = True +source = + . +omit = + .tox/* + /usr/* + setup.py + # Don't complain if non-runnable code isn't run + */__main__.py + +[report] +show_missing = True +skip_covered = True +exclude_lines = + # Have to re-enable the standard pragma + \#\s*pragma: no cover + # We optionally substitute this + ${COVERAGE_IGNORE_WINDOWS} + + # Don't complain if tests don't hit defensive assertion code: + ^\s*raise AssertionError\b + ^\s*raise NotImplementedError\b + ^\s*return NotImplemented\b + ^\s*raise$ + + # Don't complain if non-runnable code isn't run: + ^if __name__ == ['"]__main__['"]:$ + +[html] +directory = coverage-html + +# vim:ft=dosini diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4b9703a --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +*.egg-info +*.pyc +/.pytest_cache +/.coverage +/.tox +/venv* diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..948de77 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,38 @@ +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v2.5.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-docstring-first + - id: check-yaml + - id: debug-statements + - id: name-tests-test + - id: requirements-txt-fixer +- repo: https://gitlab.com/pycqa/flake8 + rev: 3.7.9 + hooks: + - id: flake8 +- repo: https://github.com/pre-commit/mirrors-autopep8 + rev: v1.5 + hooks: + - id: autopep8 +- repo: https://github.com/asottile/reorder_python_imports + rev: v1.9.0 + hooks: + - id: reorder-python-imports + args: [--py3-plus] +- repo: https://github.com/asottile/pyupgrade + rev: v1.26.2 + hooks: + - id: pyupgrade + args: [--py36-plus] +- repo: https://github.com/asottile/add-trailing-comma + rev: v1.5.0 + hooks: + - id: add-trailing-comma + args: [--py36-plus] +- repo: https://github.com/asottile/setup-cfg-fmt + rev: v1.6.0 + hooks: + - id: setup-cfg-fmt @@ -0,0 +1,19 @@ +Copyright (c) 2018 Anthony Sottile + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..2b8f227 --- /dev/null +++ b/README.md @@ -0,0 +1,282 @@ +[![Build Status](https://dev.azure.com/asottile/asottile/_apis/build/status/asottile.cfgv?branchName=master)](https://dev.azure.com/asottile/asottile/_build/latest?definitionId=24&branchName=master) +[![Azure DevOps coverage](https://img.shields.io/azure-devops/coverage/asottile/asottile/24/master.svg)](https://dev.azure.com/asottile/asottile/_build/latest?definitionId=24&branchName=master) + +cfgv +==== + +Validate configuration and produce human readable error messages. + +## Installation + +`pip install cfgv` + +## Sample error messages + +These are easier to see by example. Here's an example where I typo'd `true` +in a [pre-commit](https://pre-commit.com) configuration. + +``` +pre_commit.clientlib.InvalidConfigError: +==> File /home/asottile/workspace/pre-commit/.pre-commit-config.yaml +==> At Config() +==> At key: repos +==> At Repository(repo='https://github.com/pre-commit/pre-commit-hooks') +==> At key: hooks +==> At Hook(id='flake8') +==> At key: always_run +=====> Expected bool got str +``` + +## API + +### `cfgv.validate(value, schema)` + +Perform validation on the schema: +- raises `ValidationError` on failure +- returns the value on success (for convenience) + +### `cfgv.apply_defaults(value, schema)` + +Returns a new value which sets all missing optional values to their defaults. + +### `cfgv.remove_defaults(value, schema)` + +Returns a new value which removes all optional values that are set to their +defaults. + +### `cfgv.load_from_filename(filename, schema, load_strategy, exc_tp=ValidationError)` + +Load a file given the `load_strategy`. Reraise any errors as `exc_tp`. All +defaults will be populated in the resulting value. + +Most useful when used with `functools.partial` as follows: + +```python +load_my_cfg = functools.partial( + cfgv.load_from_filename, + schema=MY_SCHEMA, + load_strategy=json.loads, + exc_tp=MyError, +) +``` + +## Making a schema + +A schema validates a container -- `cfgv` provides `Map` and `Array` for +most normal cases. + +### writing your own schema container + +If the built-in containers below don't quite satisfy your usecase, you can +always write your own. Containers use the following interface: + +```python +class Container(object): + def check(self, v): + """check the passed in value (do not modify `v`)""" + + def apply_defaults(self, v): + """return a new value with defaults applied (do not modify `v`)""" + + def remove_defaults(self, v): + """return a new value with defaults removed (do not modify `v`)""" +``` + +### `Map(object_name, id_key, *items)` + +The most basic building block for creating a schema is a `Map` + +- `object_name`: will be displayed in error messages +- `id_key`: will be used to identify the object in error messages. Set to + `None` if there is no identifying key for the object. +- `items`: validator objects such as `Required` or `Optional` + +Consider the following schema: + +```python +Map( + 'Repo', 'url', + Required('url', check_any), +) +``` + +In an error message, the map may be displayed as: + +- `Repo(url='https://github.com/pre-commit/pre-commit')` +- `Repo(url=MISSING)` (if the key is not present) + +### `Array(of, allow_empty=True)` + +Used to nest maps inside of arrays. For arrays of scalars, see `check_array`. + +- `of`: A `Map` / `Array` or other sub-schema. +- `allow_empty`: when `False`, `Array` will ensure at least one element. + +When validated, this will check that each element adheres to the sub-schema. + +## Validator objects + +Validator objects are used to validate key-value-pairs of a `Map`. + +### writing your own validator + +If the built-in validators below don't quite satisfy your usecase, you can +always write your own. Validators use the following interface: + +```python +class Validator(object): + def check(self, dct): + """check that your specific key has the appropriate value in `dct`""" + + def apply_default(self, dct): + """modify `dct` and set the default value if it is missing""" + + def remove_default(self, dct): + """modify `dct` and remove the default value if it is present""" +``` + +It may make sense to _borrow_ functions from the built in validators. They +additionally use the following interface(s): + +- `self.key`: the key to check +- `self.check_fn`: the [check function](#check-functions) +- `self.default`: a default value to set. + +### `Required(key, check_fn)` + +Ensure that a key is present in a `Map` and adheres to the +[check function](#check-functions). + +### `RequiredRecurse(key, schema)` + +Similar to `Required`, but uses a [schema](#making-a-schema). + +### `Optional(key, check_fn, default)` + +If a key is present, check that it adheres to the +[check function](#check-functions). + +- `apply_defaults` will set the `default` if it is not present. +- `remove_defaults` will remove the value if it is equal to `default`. + +### `OptionalRecurse(key, schema, default)` + +Similar to `Optional` but uses a [schema](#making-a-schema). + +- `apply_defaults` will set the `default` if it is not present and then + validate it with the schema. +- `remove_defaults` will remove defaults using the schema, and then remove the + value it if it is equal to `default`. + +### `OptionalNoDefault(key, check_fn)` + +Like `Optional`, but does not `apply_defaults` or `remove_defaults`. + +### `Conditional(key, check_fn, condition_key, condition_value, ensure_absent=False)` + +- If `condition_key` is equal to the `condition_value`, the specific `key` +will be checked using the [check function](#check-functions). +- If `ensure_absent` is `True` and the condition check fails, the `key` will +be checked for absense. + +Note that the `condition_value` is checked for equality, so any object +implementing `__eq__` may be used. A few are provided out of the box +for this purpose, see [equality helpers](#equality-helpers). + +### `ConditionalOptional(key, check_fn, default, condition_key, condition_value, ensure_absent=False)` + +Similar to ``Conditional`` and ``Optional``. + +### `ConditionalRecurse(key, schema, condition_key, condition_value, ensure_absent=True)` + +Similar to `Conditional`, but uses a [schema](#making-a-schema). + +### `NoAdditionalKeys(keys)` + +Use in a mapping to ensure that only the `keys` specified are present. + +## Equality helpers + +Equality helpers at the very least implement `__eq__` for their behaviour. + +They may also implement `def describe_opposite(self):` for use in the +`ensure_absent=True` error message (otherwise, the `__repr__` will be used). + +### `Not(val)` + +Returns `True` if the value is not equal to `val`. + +### `In(*values)` + +Returns `True` if the value is contained in `values`. + +### `NotIn(*values)` + +Returns `True` if the value is not contained in `values`. + +## Check functions + +A number of check functions are provided out of the box. + +A check function takes a single parameter, the `value`, and either raises a +`ValidationError` or returns nothing. + +### `check_any(_)` + +A noop check function. + +### `check_type(tp, typename=None)` + +Returns a check function to check for a specific type. Setting `typename` +will replace the type's name in the error message. + +For example: + +```python +Required('key', check_type(int)) +# 'Expected bytes' in both python2 and python3. +Required('key', check_type(bytes, typename='bytes')) +``` + +Several type checking functions are provided out of the box: + +- `check_bool` +- `check_bytes` +- `check_int` +- `check_string` +- `check_text` + +### `check_one_of(possible)` + +Returns a function that checks that the value is contained in `possible`. + +For example: + +```python +Required('language', check_one_of(('javascript', 'python', 'ruby'))) +``` + +### `check_regex(v)` + +Ensures that `v` is a valid python regular expression. + +### `check_array(inner_check)` + +Returns a function that checks that a value is a sequence and that each +value in that sequence adheres to the `inner_check`. + +For example: + +```python +Required('args', check_array(check_string)) +``` + +### `check_and(*fns)` + +Returns a function that performs multiple checks on a value. + +For example: + +```python +Required('language', check_and(check_string, my_check_language)) +``` diff --git a/azure-pipelines.yml b/azure-pipelines.yml new file mode 100644 index 0000000..ebca7d2 --- /dev/null +++ b/azure-pipelines.yml @@ -0,0 +1,20 @@ +trigger: + branches: + include: [master, test-me-*] + tags: + include: ['*'] + +resources: + repositories: + - repository: asottile + type: github + endpoint: github + name: asottile/azure-pipeline-templates + ref: refs/tags/v1.0.1 + +jobs: +- template: job--pre-commit.yml@asottile +- template: job--python-tox.yml@asottile + parameters: + toxenvs: [pypy3, py36, py37, py38] + os: linux @@ -0,0 +1,408 @@ +import collections +import contextlib +import os.path +import re +import sys + + +class ValidationError(ValueError): + def __init__(self, error_msg, ctx=None): + super().__init__(error_msg) + self.error_msg = error_msg + self.ctx = ctx + + def __str__(self): + out = '\n' + err = self + while err.ctx is not None: + out += f'==> {err.ctx}\n' + err = err.error_msg + out += f'=====> {err.error_msg}' + return out + + +MISSING = collections.namedtuple('Missing', ())() +type(MISSING).__repr__ = lambda self: 'MISSING' + + +@contextlib.contextmanager +def validate_context(msg): + try: + yield + except ValidationError as e: + _, _, tb = sys.exc_info() + raise ValidationError(e, ctx=msg).with_traceback(tb) + + +@contextlib.contextmanager +def reraise_as(tp): + try: + yield + except ValidationError as e: + _, _, tb = sys.exc_info() + raise tp(e).with_traceback(tb) + + +def _dct_noop(self, dct): + pass + + +def _check_optional(self, dct): + if self.key not in dct: + return + with validate_context(f'At key: {self.key}'): + self.check_fn(dct[self.key]) + + +def _apply_default_optional(self, dct): + dct.setdefault(self.key, self.default) + + +def _remove_default_optional(self, dct): + if dct.get(self.key, MISSING) == self.default: + del dct[self.key] + + +def _require_key(self, dct): + if self.key not in dct: + raise ValidationError(f'Missing required key: {self.key}') + + +def _check_required(self, dct): + _require_key(self, dct) + _check_optional(self, dct) + + +@property +def _check_fn_recurse(self): + def check_fn(val): + validate(val, self.schema) + return check_fn + + +def _apply_default_required_recurse(self, dct): + dct[self.key] = apply_defaults(dct[self.key], self.schema) + + +def _remove_default_required_recurse(self, dct): + dct[self.key] = remove_defaults(dct[self.key], self.schema) + + +def _apply_default_optional_recurse(self, dct): + if self.key not in dct: + _apply_default_optional(self, dct) + _apply_default_required_recurse(self, dct) + + +def _remove_default_optional_recurse(self, dct): + if self.key in dct: + _remove_default_required_recurse(self, dct) + _remove_default_optional(self, dct) + + +def _get_check_conditional(inner): + def _check_conditional(self, dct): + if dct.get(self.condition_key, MISSING) == self.condition_value: + inner(self, dct) + elif ( + self.condition_key in dct and + self.ensure_absent and self.key in dct + ): + if hasattr(self.condition_value, 'describe_opposite'): + explanation = self.condition_value.describe_opposite() + else: + explanation = f'is not {self.condition_value!r}' + raise ValidationError( + f'Expected {self.key} to be absent when {self.condition_key} ' + f'{explanation}, found {self.key}: {dct[self.key]!r}', + ) + return _check_conditional + + +def _apply_default_conditional_optional(self, dct): + if dct.get(self.condition_key, MISSING) == self.condition_value: + _apply_default_optional(self, dct) + + +def _remove_default_conditional_optional(self, dct): + if dct.get(self.condition_key, MISSING) == self.condition_value: + _remove_default_optional(self, dct) + + +def _apply_default_conditional_recurse(self, dct): + if dct.get(self.condition_key, MISSING) == self.condition_value: + _apply_default_required_recurse(self, dct) + + +def _remove_default_conditional_recurse(self, dct): + if dct.get(self.condition_key, MISSING) == self.condition_value: + _remove_default_required_recurse(self, dct) + + +def _no_additional_keys_check(self, dct): + extra = sorted(set(dct) - set(self.keys)) + if extra: + extra_s = ', '.join(str(x) for x in extra) + keys_s = ', '.join(str(x) for x in self.keys) + raise ValidationError( + f'Additional keys found: {extra_s}. ' + f'Only these keys are allowed: {keys_s}', + ) + + +def _warn_additional_keys_check(self, dct): + extra = sorted(set(dct) - set(self.keys)) + if extra: + self.callback(extra, self.keys, dct) + + +Required = collections.namedtuple('Required', ('key', 'check_fn')) +Required.check = _check_required +Required.apply_default = _dct_noop +Required.remove_default = _dct_noop +RequiredRecurse = collections.namedtuple('RequiredRecurse', ('key', 'schema')) +RequiredRecurse.check = _check_required +RequiredRecurse.check_fn = _check_fn_recurse +RequiredRecurse.apply_default = _apply_default_required_recurse +RequiredRecurse.remove_default = _remove_default_required_recurse +Optional = collections.namedtuple('Optional', ('key', 'check_fn', 'default')) +Optional.check = _check_optional +Optional.apply_default = _apply_default_optional +Optional.remove_default = _remove_default_optional +OptionalRecurse = collections.namedtuple( + 'OptionalRecurse', ('key', 'schema', 'default'), +) +OptionalRecurse.check = _check_optional +OptionalRecurse.check_fn = _check_fn_recurse +OptionalRecurse.apply_default = _apply_default_optional_recurse +OptionalRecurse.remove_default = _remove_default_optional_recurse +OptionalNoDefault = collections.namedtuple( + 'OptionalNoDefault', ('key', 'check_fn'), +) +OptionalNoDefault.check = _check_optional +OptionalNoDefault.apply_default = _dct_noop +OptionalNoDefault.remove_default = _dct_noop +Conditional = collections.namedtuple( + 'Conditional', + ('key', 'check_fn', 'condition_key', 'condition_value', 'ensure_absent'), +) +Conditional.__new__.__defaults__ = (False,) +Conditional.check = _get_check_conditional(_check_required) +Conditional.apply_default = _dct_noop +Conditional.remove_default = _dct_noop +ConditionalOptional = collections.namedtuple( + 'ConditionalOptional', + ( + 'key', 'check_fn', 'default', 'condition_key', 'condition_value', + 'ensure_absent', + ), +) +ConditionalOptional.__new__.__defaults__ = (False,) +ConditionalOptional.check = _get_check_conditional(_check_optional) +ConditionalOptional.apply_default = _apply_default_conditional_optional +ConditionalOptional.remove_default = _remove_default_conditional_optional +ConditionalRecurse = collections.namedtuple( + 'ConditionalRecurse', + ('key', 'schema', 'condition_key', 'condition_value', 'ensure_absent'), +) +ConditionalRecurse.__new__.__defaults__ = (False,) +ConditionalRecurse.check = _get_check_conditional(_check_required) +ConditionalRecurse.check_fn = _check_fn_recurse +ConditionalRecurse.apply_default = _apply_default_conditional_recurse +ConditionalRecurse.remove_default = _remove_default_conditional_recurse +NoAdditionalKeys = collections.namedtuple('NoAdditionalKeys', ('keys',)) +NoAdditionalKeys.check = _no_additional_keys_check +NoAdditionalKeys.apply_default = _dct_noop +NoAdditionalKeys.remove_default = _dct_noop +WarnAdditionalKeys = collections.namedtuple( + 'WarnAdditionalKeys', ('keys', 'callback'), +) +WarnAdditionalKeys.check = _warn_additional_keys_check +WarnAdditionalKeys.apply_default = _dct_noop +WarnAdditionalKeys.remove_default = _dct_noop + + +class Map(collections.namedtuple('Map', ('object_name', 'id_key', 'items'))): + __slots__ = () + + def __new__(cls, object_name, id_key, *items): + return super().__new__(cls, object_name, id_key, items) + + def check(self, v): + if not isinstance(v, dict): + raise ValidationError( + f'Expected a {self.object_name} map but got a ' + f'{type(v).__name__}', + ) + if self.id_key is None: + context = f'At {self.object_name}()' + else: + key_v_s = v.get(self.id_key, MISSING) + context = f'At {self.object_name}({self.id_key}={key_v_s!r})' + with validate_context(context): + for item in self.items: + item.check(v) + + def apply_defaults(self, v): + ret = v.copy() + for item in self.items: + item.apply_default(ret) + return ret + + def remove_defaults(self, v): + ret = v.copy() + for item in self.items: + item.remove_default(ret) + return ret + + +class Array(collections.namedtuple('Array', ('of', 'allow_empty'))): + __slots__ = () + + def __new__(cls, of, allow_empty=True): + return super().__new__(cls, of=of, allow_empty=allow_empty) + + def check(self, v): + check_array(check_any)(v) + if not self.allow_empty and not v: + raise ValidationError( + f"Expected at least 1 '{self.of.object_name}'", + ) + for val in v: + validate(val, self.of) + + def apply_defaults(self, v): + return [apply_defaults(val, self.of) for val in v] + + def remove_defaults(self, v): + return [remove_defaults(val, self.of) for val in v] + + +class Not(collections.namedtuple('Not', ('val',))): + __slots__ = () + + def describe_opposite(self): + return f'is {self.val!r}' + + def __eq__(self, other): + return other is not MISSING and other != self.val + + +class NotIn(collections.namedtuple('NotIn', ('values',))): + __slots__ = () + + def __new__(cls, *values): + return super().__new__(cls, values=values) + + def describe_opposite(self): + return f'is any of {self.values!r}' + + def __eq__(self, other): + return other is not MISSING and other not in self.values + + +class In(collections.namedtuple('In', ('values',))): + __slots__ = () + + def __new__(cls, *values): + return super().__new__(cls, values=values) + + def describe_opposite(self): + return f'is not any of {self.values!r}' + + def __eq__(self, other): + return other is not MISSING and other in self.values + + +def check_any(_): + pass + + +def check_type(tp, typename=None): + def check_type_fn(v): + if not isinstance(v, tp): + typename_s = typename or tp.__name__ + raise ValidationError( + f'Expected {typename_s} got {type(v).__name__}', + ) + return check_type_fn + + +check_bool = check_type(bool) +check_bytes = check_type(bytes) +check_int = check_type(int) +check_string = check_type(str, typename='string') +check_text = check_type(str, typename='text') + + +def check_one_of(possible): + def check_one_of_fn(v): + if v not in possible: + possible_s = ', '.join(str(x) for x in sorted(possible)) + raise ValidationError( + f'Expected one of {possible_s} but got: {v!r}', + ) + return check_one_of_fn + + +def check_regex(v): + try: + re.compile(v) + except re.error: + raise ValidationError(f'{v!r} is not a valid python regex') + + +def check_array(inner_check): + def check_array_fn(v): + if not isinstance(v, (list, tuple)): + raise ValidationError( + f'Expected array but got {type(v).__name__!r}', + ) + + for i, val in enumerate(v): + with validate_context(f'At index {i}'): + inner_check(val) + return check_array_fn + + +def check_and(*fns): + def check(v): + for fn in fns: + fn(v) + return check + + +def validate(v, schema): + schema.check(v) + return v + + +def apply_defaults(v, schema): + return schema.apply_defaults(v) + + +def remove_defaults(v, schema): + return schema.remove_defaults(v) + + +def load_from_filename( + filename, + schema, + load_strategy, + exc_tp=ValidationError, +): + with reraise_as(exc_tp): + if not os.path.exists(filename): + raise ValidationError(f'{filename} does not exist') + + with open(filename, encoding='utf-8') as f: + contents = f.read() + + with validate_context(f'File {filename}'): + try: + data = load_strategy(contents) + except Exception as e: + raise ValidationError(str(e)) + + validate(data, schema) + return apply_defaults(data, schema) diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..4dbfffa --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,3 @@ +coverage +pre-commit +pytest diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..ce6e897 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,27 @@ +[metadata] +name = cfgv +version = 3.1.0 +description = Validate configuration and produce human readable error messages. +long_description = file: README.md +long_description_content_type = text/markdown +url = https://github.com/asottile/cfgv +author = Anthony Sottile +author_email = asottile@umich.edu +license = MIT +license_file = LICENSE +classifiers = + License :: OSI Approved :: MIT License + Programming Language :: Python :: 3 + Programming Language :: Python :: 3 :: Only + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + Programming Language :: Python :: Implementation :: CPython + Programming Language :: Python :: Implementation :: PyPy + +[options] +py_modules = cfgv +python_requires = >=3.6.1 + +[bdist_wheel] +universal = True diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..8bf1ba9 --- /dev/null +++ b/setup.py @@ -0,0 +1,2 @@ +from setuptools import setup +setup() diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/__init__.py diff --git a/tests/cfgv_test.py b/tests/cfgv_test.py new file mode 100644 index 0000000..1b143a6 --- /dev/null +++ b/tests/cfgv_test.py @@ -0,0 +1,680 @@ +import json +from unittest import mock + +import pytest + +from cfgv import apply_defaults +from cfgv import Array +from cfgv import check_and +from cfgv import check_any +from cfgv import check_array +from cfgv import check_bool +from cfgv import check_one_of +from cfgv import check_regex +from cfgv import check_type +from cfgv import Conditional +from cfgv import ConditionalOptional +from cfgv import ConditionalRecurse +from cfgv import In +from cfgv import load_from_filename +from cfgv import Map +from cfgv import MISSING +from cfgv import NoAdditionalKeys +from cfgv import Not +from cfgv import NotIn +from cfgv import Optional +from cfgv import OptionalNoDefault +from cfgv import OptionalRecurse +from cfgv import remove_defaults +from cfgv import Required +from cfgv import RequiredRecurse +from cfgv import validate +from cfgv import ValidationError +from cfgv import WarnAdditionalKeys + + +def _assert_exception_trace(e, trace): + inner = e + for ctx in trace[:-1]: + assert inner.ctx == ctx + inner = inner.error_msg + assert inner.error_msg == trace[-1] + + +def test_ValidationError_simple_str(): + assert str(ValidationError('error msg')) == ( + '\n' + '=====> error msg' + ) + + +def test_ValidationError_nested(): + error = ValidationError( + ValidationError( + ValidationError('error msg'), + ctx='At line 1', + ), + ctx='In file foo', + ) + assert str(error) == ( + '\n' + '==> In file foo\n' + '==> At line 1\n' + '=====> error msg' + ) + + +def test_check_one_of(): + with pytest.raises(ValidationError) as excinfo: + check_one_of((1, 2))(3) + assert excinfo.value.error_msg == 'Expected one of 1, 2 but got: 3' + + +def test_check_one_of_ok(): + check_one_of((1, 2))(2) + + +def test_check_regex(): + with pytest.raises(ValidationError) as excinfo: + check_regex('(') + assert excinfo.value.error_msg == "'(' is not a valid python regex" + + +def test_check_regex_ok(): + check_regex('^$') + + +def test_check_array_failed_inner_check(): + check = check_array(check_bool) + with pytest.raises(ValidationError) as excinfo: + check([True, False, 5]) + _assert_exception_trace( + excinfo.value, ('At index 2', 'Expected bool got int'), + ) + + +def test_check_array_ok(): + check_array(check_bool)([True, False]) + + +def test_check_and(): + check = check_and(check_type(str), check_regex) + with pytest.raises(ValidationError) as excinfo: + check(True) + assert excinfo.value.error_msg == 'Expected str got bool' + with pytest.raises(ValidationError) as excinfo: + check('(') + assert excinfo.value.error_msg == "'(' is not a valid python regex" + + +def test_check_and_ok(): + check = check_and(check_type(str), check_regex) + check('^$') + + +@pytest.mark.parametrize( + ('val', 'expected'), + (('bar', True), ('foo', False), (MISSING, False)), +) +def test_not(val, expected): + compared = Not('foo') + assert (val == compared) is expected + assert (compared == val) is expected + + +@pytest.mark.parametrize( + ('values', 'expected'), + (('bar', True), ('foo', False), (MISSING, False)), +) +def test_not_in(values, expected): + compared = NotIn('baz', 'foo') + assert (values == compared) is expected + assert (compared == values) is expected + + +@pytest.mark.parametrize( + ('values', 'expected'), + (('bar', False), ('foo', True), ('baz', True), (MISSING, False)), +) +def test_in(values, expected): + compared = In('baz', 'foo') + assert (values == compared) is expected + assert (compared == values) is expected + + +trivial_array_schema = Array(Map('foo', 'id')) +trivial_array_schema_nonempty = Array(Map('foo', 'id'), allow_empty=False) + + +def test_validate_top_level_array_not_an_array(): + with pytest.raises(ValidationError) as excinfo: + validate({}, trivial_array_schema) + assert excinfo.value.error_msg == "Expected array but got 'dict'" + + +def test_validate_top_level_array_no_objects(): + with pytest.raises(ValidationError) as excinfo: + validate([], trivial_array_schema_nonempty) + assert excinfo.value.error_msg == "Expected at least 1 'foo'" + + +def test_trivial_array_schema_ok_empty(): + validate([], trivial_array_schema) + + +@pytest.mark.parametrize('v', (({},), [{}])) +def test_ok_both_types(v): + validate(v, trivial_array_schema) + + +map_required = Map('foo', 'key', Required('key', check_bool)) +map_optional = Map('foo', 'key', Optional('key', check_bool, False)) +map_no_default = Map('foo', 'key', OptionalNoDefault('key', check_bool)) +map_no_id_key = Map('foo', None, Required('key', check_bool)) + + +def test_map_wrong_type(): + with pytest.raises(ValidationError) as excinfo: + validate([], map_required) + assert excinfo.value.error_msg == 'Expected a foo map but got a list' + + +def test_required_missing_key(): + with pytest.raises(ValidationError) as excinfo: + validate({}, map_required) + expected = ('At foo(key=MISSING)', 'Missing required key: key') + _assert_exception_trace(excinfo.value, expected) + + +@pytest.mark.parametrize( + 'schema', (map_required, map_optional, map_no_default), +) +def test_map_value_wrong_type(schema): + with pytest.raises(ValidationError) as excinfo: + validate({'key': 5}, schema) + expected = ('At foo(key=5)', 'At key: key', 'Expected bool got int') + _assert_exception_trace(excinfo.value, expected) + + +@pytest.mark.parametrize( + 'schema', (map_required, map_optional, map_no_default), +) +def test_map_value_correct_type(schema): + validate({'key': True}, schema) + + +@pytest.mark.parametrize('schema', (map_optional, map_no_default)) +def test_optional_key_missing(schema): + validate({}, schema) + + +def test_error_message_no_id_key(): + with pytest.raises(ValidationError) as excinfo: + validate({'key': 5}, map_no_id_key) + expected = ('At foo()', 'At key: key', 'Expected bool got int') + _assert_exception_trace(excinfo.value, expected) + + +map_conditional = Map( + 'foo', 'key', + Conditional( + 'key2', check_bool, condition_key='key', condition_value=True, + ), +) +map_conditional_not = Map( + 'foo', 'key', + Conditional( + 'key2', check_bool, condition_key='key', condition_value=Not(False), + ), +) +map_conditional_absent = Map( + 'foo', 'key', + Conditional( + 'key2', check_bool, + condition_key='key', condition_value=True, ensure_absent=True, + ), +) +map_conditional_absent_not = Map( + 'foo', 'key', + Conditional( + 'key2', check_bool, + condition_key='key', condition_value=Not(True), ensure_absent=True, + ), +) +map_conditional_absent_not_in = Map( + 'foo', 'key', + Conditional( + 'key2', check_bool, + condition_key='key', condition_value=NotIn(1, 2), ensure_absent=True, + ), +) +map_conditional_absent_in = Map( + 'foo', 'key', + Conditional( + 'key2', check_bool, + condition_key='key', condition_value=In(1, 2), ensure_absent=True, + ), +) + + +@pytest.mark.parametrize('schema', (map_conditional, map_conditional_not)) +@pytest.mark.parametrize( + 'v', + ( + # Conditional check passes, key2 is checked and passes + {'key': True, 'key2': True}, + # Conditional check fails, key2 is not checked + {'key': False, 'key2': 'ohai'}, + ), +) +def test_ok_conditional_schemas(v, schema): + validate(v, schema) + + +@pytest.mark.parametrize('schema', (map_conditional, map_conditional_not)) +def test_not_ok_conditional_schemas(schema): + with pytest.raises(ValidationError) as excinfo: + validate({'key': True, 'key2': 5}, schema) + expected = ('At foo(key=True)', 'At key: key2', 'Expected bool got int') + _assert_exception_trace(excinfo.value, expected) + + +def test_ensure_absent_conditional(): + with pytest.raises(ValidationError) as excinfo: + validate({'key': False, 'key2': True}, map_conditional_absent) + expected = ( + 'At foo(key=False)', + 'Expected key2 to be absent when key is not True, ' + 'found key2: True', + ) + _assert_exception_trace(excinfo.value, expected) + + +def test_ensure_absent_conditional_not(): + with pytest.raises(ValidationError) as excinfo: + validate({'key': True, 'key2': True}, map_conditional_absent_not) + expected = ( + 'At foo(key=True)', + 'Expected key2 to be absent when key is True, ' + 'found key2: True', + ) + _assert_exception_trace(excinfo.value, expected) + + +def test_ensure_absent_conditional_not_in(): + with pytest.raises(ValidationError) as excinfo: + validate({'key': 1, 'key2': True}, map_conditional_absent_not_in) + expected = ( + 'At foo(key=1)', + 'Expected key2 to be absent when key is any of (1, 2), ' + 'found key2: True', + ) + _assert_exception_trace(excinfo.value, expected) + + +def test_ensure_absent_conditional_in(): + with pytest.raises(ValidationError) as excinfo: + validate({'key': 3, 'key2': True}, map_conditional_absent_in) + expected = ( + 'At foo(key=3)', + 'Expected key2 to be absent when key is not any of (1, 2), ' + 'found key2: True', + ) + _assert_exception_trace(excinfo.value, expected) + + +def test_no_error_conditional_absent(): + validate({}, map_conditional_absent) + validate({}, map_conditional_absent_not) + validate({'key2': True}, map_conditional_absent) + validate({'key2': True}, map_conditional_absent_not) + + +def test_apply_defaults_copies_object(): + val = {} + ret = apply_defaults(val, map_optional) + assert ret is not val + + +def test_apply_defaults_sets_default(): + ret = apply_defaults({}, map_optional) + assert ret == {'key': False} + + +def test_apply_defaults_does_not_change_non_default(): + ret = apply_defaults({'key': True}, map_optional) + assert ret == {'key': True} + + +def test_apply_defaults_does_nothing_on_non_optional(): + ret = apply_defaults({}, map_required) + assert ret == {} + + +def test_apply_defaults_map_in_list(): + ret = apply_defaults([{}], Array(map_optional)) + assert ret == [{'key': False}] + + +def test_remove_defaults_copies_object(): + val = {'key': False} + ret = remove_defaults(val, map_optional) + assert ret is not val + + +def test_remove_defaults_removes_defaults(): + ret = remove_defaults({'key': False}, map_optional) + assert ret == {} + + +def test_remove_defaults_nothing_to_remove(): + ret = remove_defaults({}, map_optional) + assert ret == {} + + +def test_remove_defaults_does_not_change_non_default(): + ret = remove_defaults({'key': True}, map_optional) + assert ret == {'key': True} + + +def test_remove_defaults_map_in_list(): + ret = remove_defaults([{'key': False}], Array(map_optional)) + assert ret == [{}] + + +def test_remove_defaults_does_nothing_on_non_optional(): + ret = remove_defaults({'key': True}, map_required) + assert ret == {'key': True} + + +nested_schema_required = Map( + 'Repository', 'repo', + Required('repo', check_any), + RequiredRecurse('hooks', Array(map_required)), +) +nested_schema_optional = Map( + 'Repository', 'repo', + Required('repo', check_any), + RequiredRecurse('hooks', Array(map_optional)), +) + + +def test_validate_failure_nested(): + with pytest.raises(ValidationError) as excinfo: + validate({'repo': 1, 'hooks': [{}]}, nested_schema_required) + expected = ( + 'At Repository(repo=1)', + 'At key: hooks', + 'At foo(key=MISSING)', + 'Missing required key: key', + ) + _assert_exception_trace(excinfo.value, expected) + + +def test_apply_defaults_nested(): + val = {'repo': 'repo1', 'hooks': [{}]} + ret = apply_defaults(val, nested_schema_optional) + assert ret == {'repo': 'repo1', 'hooks': [{'key': False}]} + + +def test_remove_defaults_nested(): + val = {'repo': 'repo1', 'hooks': [{'key': False}]} + ret = remove_defaults(val, nested_schema_optional) + assert ret == {'repo': 'repo1', 'hooks': [{}]} + + +link = Map('Link', 'key', Required('key', check_bool)) +optional_nested_schema = Map( + 'Config', None, + OptionalRecurse('links', Array(link), []), +) + + +def test_validate_failure_optional_recurse(): + with pytest.raises(ValidationError) as excinfo: + validate({'links': [{}]}, optional_nested_schema) + expected = ( + 'At Config()', + 'At key: links', + 'At Link(key=MISSING)', + 'Missing required key: key', + ) + _assert_exception_trace(excinfo.value, expected) + + +def test_optional_recurse_ok_missing(): + validate({}, optional_nested_schema) + + +def test_apply_defaults_optional_recurse_missing(): + ret = apply_defaults({}, optional_nested_schema) + assert ret == {'links': []} + + +def test_apply_defaults_optional_recurse_already_present(): + ret = apply_defaults({'links': [{'key': True}]}, optional_nested_schema) + assert ret == {'links': [{'key': True}]} + + +def test_remove_defaults_optional_recurse_not_present(): + assert remove_defaults({}, optional_nested_schema) == {} + + +def test_remove_defaults_optional_recurse_present_at_default(): + assert remove_defaults({'links': []}, optional_nested_schema) == {} + + +def test_remove_defaults_optional_recurse_non_default(): + ret = remove_defaults({'links': [{'key': True}]}, optional_nested_schema) + assert ret == {'links': [{'key': True}]} + + +builder_opts = Map('BuilderOpts', None, Optional('noop', check_bool, True)) +optional_nested_optional_schema = Map( + 'Config', None, + OptionalRecurse('builder', builder_opts, {}), +) + + +def test_optional_optional_apply_defaults(): + ret = apply_defaults({}, optional_nested_optional_schema) + assert ret == {'builder': {'noop': True}} + + +def test_optional_optional_remove_defaults(): + val = {'builder': {'noop': True}} + ret = remove_defaults(val, optional_nested_optional_schema) + assert ret == {} + + +params1_schema = Map('Params1', None, Required('p1', check_bool)) +params2_schema = Map('Params2', None, Required('p2', check_bool)) +conditional_nested_schema = Map( + 'Config', None, + Required('type', check_any), + ConditionalRecurse('params', params1_schema, 'type', 'type1'), + ConditionalRecurse('params', params2_schema, 'type', 'type2'), +) + + +@pytest.mark.parametrize( + 'val', + ( + {'type': 'type3'}, # matches no condition + {'type': 'type1', 'params': {'p1': True}}, + {'type': 'type2', 'params': {'p2': True}}, + ), +) +def test_conditional_recurse_ok(val): + validate(val, conditional_nested_schema) + + +def test_conditional_recurse_error(): + with pytest.raises(ValidationError) as excinfo: + val = {'type': 'type1', 'params': {'p2': True}} + validate(val, conditional_nested_schema) + expected = ( + 'At Config()', + 'At key: params', + 'At Params1()', + 'Missing required key: p1', + ) + _assert_exception_trace(excinfo.value, expected) + + +class Error(Exception): + pass + + +def test_load_from_filename_file_does_not_exist(): + with pytest.raises(Error) as excinfo: + load_from_filename('does_not_exist', map_required, json.loads, Error) + assert excinfo.value.args[0].error_msg == 'does_not_exist does not exist' + + +def test_load_from_filename_fails_load_strategy(tmpdir): + f = tmpdir.join('foo.notjson') + f.write('totes not json') + with pytest.raises(Error) as excinfo: + load_from_filename(f.strpath, map_required, json.loads, Error) + # ANY is json's error message + expected = (f'File {f.strpath}', mock.ANY) + _assert_exception_trace(excinfo.value.args[0], expected) + + +def test_load_from_filename_validation_error(tmpdir): + f = tmpdir.join('foo.json') + f.write('{}') + with pytest.raises(Error) as excinfo: + load_from_filename(f.strpath, map_required, json.loads, Error) + expected = ( + f'File {f.strpath}', + 'At foo(key=MISSING)', + 'Missing required key: key', + ) + _assert_exception_trace(excinfo.value.args[0], expected) + + +def test_load_from_filename_applies_defaults(tmpdir): + f = tmpdir.join('foo.json') + f.write('{}') + ret = load_from_filename(f.strpath, map_optional, json.loads, Error) + assert ret == {'key': False} + + +conditional_recurse = Map( + 'Map', None, + + Required('t', check_bool), + ConditionalRecurse( + 'v', Map('Inner', 'k', Optional('k', check_bool, True)), + 't', True, + ), + ConditionalRecurse( + 'v', Map('Inner', 'k', Optional('k', check_bool, False)), + 't', False, + ), +) + + +@pytest.mark.parametrize('tvalue', (True, False)) +def test_conditional_recurse_apply_defaults(tvalue): + val = {'t': tvalue, 'v': {}} + ret = apply_defaults(val, conditional_recurse) + assert ret == {'t': tvalue, 'v': {'k': tvalue}} + + val = {'t': tvalue, 'v': {'k': not tvalue}} + ret = apply_defaults(val, conditional_recurse) + assert ret == {'t': tvalue, 'v': {'k': not tvalue}} + + +@pytest.mark.parametrize('tvalue', (True, False)) +def test_conditional_recurse_remove_defaults(tvalue): + val = {'t': tvalue, 'v': {'k': tvalue}} + ret = remove_defaults(val, conditional_recurse) + assert ret == {'t': tvalue, 'v': {}} + + val = {'t': tvalue, 'v': {'k': not tvalue}} + ret = remove_defaults(val, conditional_recurse) + assert ret == {'t': tvalue, 'v': {'k': not tvalue}} + + +conditional_optional = Map( + 'Map', None, + + Required('t', check_bool), + ConditionalOptional('v', check_bool, True, 't', True), + ConditionalOptional('v', check_bool, False, 't', False), +) + + +@pytest.mark.parametrize('tvalue', (True, False)) +def test_conditional_optional_check(tvalue): + with pytest.raises(ValidationError) as excinfo: + validate({'t': tvalue, 'v': 2}, conditional_optional) + expected = ( + 'At Map()', + 'At key: v', + 'Expected bool got int', + ) + _assert_exception_trace(excinfo.value, expected) + + validate({'t': tvalue, 'v': tvalue}, conditional_optional) + + +@pytest.mark.parametrize('tvalue', (True, False)) +def test_conditional_optional_apply_default(tvalue): + ret = apply_defaults({'t': tvalue}, conditional_optional) + assert ret == {'t': tvalue, 'v': tvalue} + + +@pytest.mark.parametrize('tvalue', (True, False)) +def test_conditional_optional_remove_default(tvalue): + ret = remove_defaults({'t': tvalue, 'v': tvalue}, conditional_optional) + assert ret == {'t': tvalue} + ret = remove_defaults({'t': tvalue, 'v': not tvalue}, conditional_optional) + assert ret == {'t': tvalue, 'v': not tvalue} + + +no_additional_keys = Map( + 'Map', None, + Required(True, check_bool), + NoAdditionalKeys((True,)), +) + + +def test_no_additional_keys(): + with pytest.raises(ValidationError) as excinfo: + validate({True: True, False: False}, no_additional_keys) + expected = ( + 'At Map()', + 'Additional keys found: False. Only these keys are allowed: True', + ) + _assert_exception_trace(excinfo.value, expected) + + validate({True: True}, no_additional_keys) + + +@pytest.fixture +def warn_additional_keys(): + ret = mock.Mock() + + def callback(extra, keys, dct): + return ret.record(extra, keys, dct) + + ret.schema = Map( + 'Map', None, + Required(True, check_bool), + WarnAdditionalKeys((True,), callback), + ) + yield ret + + +def test_warn_additional_keys_when_has_extra_keys(warn_additional_keys): + validate({True: True, False: False}, warn_additional_keys.schema) + assert warn_additional_keys.record.called + + +def test_warn_additional_keys_when_no_extra_keys(warn_additional_keys): + validate({True: True}, warn_additional_keys.schema) + assert not warn_additional_keys.record.called @@ -0,0 +1,18 @@ +[tox] +envlist = py36,py37,pypy3,pre-commit + +[testenv] +deps = -rrequirements-dev.txt +commands = + coverage erase + coverage run -m pytest {posargs:tests} + coverage report --fail-under 100 + pre-commit install + +[testenv:pre-commit] +skip_install = true +deps = pre-commit +commands = pre-commit run --all-files --show-diff-on-failure + +[pep8] +ignore = E265,E501,W504 |