diff options
Diffstat (limited to '')
-rw-r--r-- | third_party/python/voluptuous/voluptuous/schema_builder.py | 1301 |
1 files changed, 1301 insertions, 0 deletions
diff --git a/third_party/python/voluptuous/voluptuous/schema_builder.py b/third_party/python/voluptuous/voluptuous/schema_builder.py new file mode 100644 index 0000000000..df19c8da2d --- /dev/null +++ b/third_party/python/voluptuous/voluptuous/schema_builder.py @@ -0,0 +1,1301 @@ +import collections +import inspect +import re +from functools import wraps +import sys +from contextlib import contextmanager + +import itertools +from voluptuous import error as er + +if sys.version_info >= (3,): + long = int + unicode = str + basestring = str + ifilter = filter + + def iteritems(d): + return d.items() +else: + from itertools import ifilter + + def iteritems(d): + return d.iteritems() + +if sys.version_info >= (3, 3): + _Mapping = collections.abc.Mapping +else: + _Mapping = collections.Mapping + +"""Schema validation for Python data structures. + +Given eg. a nested data structure like this: + + { + 'exclude': ['Users', 'Uptime'], + 'include': [], + 'set': { + 'snmp_community': 'public', + 'snmp_timeout': 15, + 'snmp_version': '2c', + }, + 'targets': { + 'localhost': { + 'exclude': ['Uptime'], + 'features': { + 'Uptime': { + 'retries': 3, + }, + 'Users': { + 'snmp_community': 'monkey', + 'snmp_port': 15, + }, + }, + 'include': ['Users'], + 'set': { + 'snmp_community': 'monkeys', + }, + }, + }, + } + +A schema like this: + + >>> settings = { + ... 'snmp_community': str, + ... 'retries': int, + ... 'snmp_version': All(Coerce(str), Any('3', '2c', '1')), + ... } + >>> features = ['Ping', 'Uptime', 'Http'] + >>> schema = Schema({ + ... 'exclude': features, + ... 'include': features, + ... 'set': settings, + ... 'targets': { + ... 'exclude': features, + ... 'include': features, + ... 'features': { + ... str: settings, + ... }, + ... }, + ... }) + +Validate like so: + + >>> schema({ + ... 'set': { + ... 'snmp_community': 'public', + ... 'snmp_version': '2c', + ... }, + ... 'targets': { + ... 'exclude': ['Ping'], + ... 'features': { + ... 'Uptime': {'retries': 3}, + ... 'Users': {'snmp_community': 'monkey'}, + ... }, + ... }, + ... }) == { + ... 'set': {'snmp_version': '2c', 'snmp_community': 'public'}, + ... 'targets': { + ... 'exclude': ['Ping'], + ... 'features': {'Uptime': {'retries': 3}, + ... 'Users': {'snmp_community': 'monkey'}}}} + True +""" + +# options for extra keys +PREVENT_EXTRA = 0 # any extra key not in schema will raise an error +ALLOW_EXTRA = 1 # extra keys not in schema will be included in output +REMOVE_EXTRA = 2 # extra keys not in schema will be excluded from output + + +def _isnamedtuple(obj): + return isinstance(obj, tuple) and hasattr(obj, '_fields') + + +primitive_types = (str, unicode, bool, int, float) + + +class Undefined(object): + def __nonzero__(self): + return False + + def __repr__(self): + return '...' + + +UNDEFINED = Undefined() + + +def Self(): + raise er.SchemaError('"Self" should never be called') + + +def default_factory(value): + if value is UNDEFINED or callable(value): + return value + return lambda: value + + +@contextmanager +def raises(exc, msg=None, regex=None): + try: + yield + except exc as e: + if msg is not None: + assert str(e) == msg, '%r != %r' % (str(e), msg) + if regex is not None: + assert re.search(regex, str(e)), '%r does not match %r' % (str(e), regex) + + +def Extra(_): + """Allow keys in the data that are not present in the schema.""" + raise er.SchemaError('"Extra" should never be called') + + +# As extra() is never called there's no way to catch references to the +# deprecated object, so we just leave an alias here instead. +extra = Extra + + +class Schema(object): + """A validation schema. + + The schema is a Python tree-like structure where nodes are pattern + matched against corresponding trees of values. + + Nodes can be values, in which case a direct comparison is used, types, + in which case an isinstance() check is performed, or callables, which will + validate and optionally convert the value. + + We can equate schemas also. + + For Example: + + >>> v = Schema({Required('a'): unicode}) + >>> v1 = Schema({Required('a'): unicode}) + >>> v2 = Schema({Required('b'): unicode}) + >>> assert v == v1 + >>> assert v != v2 + + """ + + _extra_to_name = { + REMOVE_EXTRA: 'REMOVE_EXTRA', + ALLOW_EXTRA: 'ALLOW_EXTRA', + PREVENT_EXTRA: 'PREVENT_EXTRA', + } + + def __init__(self, schema, required=False, extra=PREVENT_EXTRA): + """Create a new Schema. + + :param schema: Validation schema. See :module:`voluptuous` for details. + :param required: Keys defined in the schema must be in the data. + :param extra: Specify how extra keys in the data are treated: + - :const:`~voluptuous.PREVENT_EXTRA`: to disallow any undefined + extra keys (raise ``Invalid``). + - :const:`~voluptuous.ALLOW_EXTRA`: to include undefined extra + keys in the output. + - :const:`~voluptuous.REMOVE_EXTRA`: to exclude undefined extra keys + from the output. + - Any value other than the above defaults to + :const:`~voluptuous.PREVENT_EXTRA` + """ + self.schema = schema + self.required = required + self.extra = int(extra) # ensure the value is an integer + self._compiled = self._compile(schema) + + @classmethod + def infer(cls, data, **kwargs): + """Create a Schema from concrete data (e.g. an API response). + + For example, this will take a dict like: + + { + 'foo': 1, + 'bar': { + 'a': True, + 'b': False + }, + 'baz': ['purple', 'monkey', 'dishwasher'] + } + + And return a Schema: + + { + 'foo': int, + 'bar': { + 'a': bool, + 'b': bool + }, + 'baz': [str] + } + + Note: only very basic inference is supported. + """ + def value_to_schema_type(value): + if isinstance(value, dict): + if len(value) == 0: + return dict + return {k: value_to_schema_type(v) + for k, v in iteritems(value)} + if isinstance(value, list): + if len(value) == 0: + return list + else: + return [value_to_schema_type(v) + for v in value] + return type(value) + + return cls(value_to_schema_type(data), **kwargs) + + def __eq__(self, other): + if not isinstance(other, Schema): + return False + return other.schema == self.schema + + def __ne__(self, other): + return not (self == other) + + def __str__(self): + return str(self.schema) + + def __repr__(self): + return "<Schema(%s, extra=%s, required=%s) object at 0x%x>" % ( + self.schema, self._extra_to_name.get(self.extra, '??'), + self.required, id(self)) + + def __call__(self, data): + """Validate data against this schema.""" + try: + return self._compiled([], data) + except er.MultipleInvalid: + raise + except er.Invalid as e: + raise er.MultipleInvalid([e]) + # return self.validate([], self.schema, data) + + def _compile(self, schema): + if schema is Extra: + return lambda _, v: v + if schema is Self: + return lambda p, v: self._compiled(p, v) + elif hasattr(schema, "__voluptuous_compile__"): + return schema.__voluptuous_compile__(self) + if isinstance(schema, Object): + return self._compile_object(schema) + if isinstance(schema, _Mapping): + return self._compile_dict(schema) + elif isinstance(schema, list): + return self._compile_list(schema) + elif isinstance(schema, tuple): + return self._compile_tuple(schema) + elif isinstance(schema, (frozenset, set)): + return self._compile_set(schema) + type_ = type(schema) + if inspect.isclass(schema): + type_ = schema + if type_ in (bool, bytes, int, long, str, unicode, float, complex, object, + list, dict, type(None)) or callable(schema): + return _compile_scalar(schema) + raise er.SchemaError('unsupported schema data type %r' % + type(schema).__name__) + + def _compile_mapping(self, schema, invalid_msg=None): + """Create validator for given mapping.""" + invalid_msg = invalid_msg or 'mapping value' + + # Keys that may be required + all_required_keys = set(key for key in schema + if key is not Extra and + ((self.required and not isinstance(key, (Optional, Remove))) or + isinstance(key, Required))) + + # Keys that may have defaults + all_default_keys = set(key for key in schema + if isinstance(key, Required) or + isinstance(key, Optional)) + + _compiled_schema = {} + for skey, svalue in iteritems(schema): + new_key = self._compile(skey) + new_value = self._compile(svalue) + _compiled_schema[skey] = (new_key, new_value) + + candidates = list(_iterate_mapping_candidates(_compiled_schema)) + + # After we have the list of candidates in the correct order, we want to apply some optimization so that each + # key in the data being validated will be matched against the relevant schema keys only. + # No point in matching against different keys + additional_candidates = [] + candidates_by_key = {} + for skey, (ckey, cvalue) in candidates: + if type(skey) in primitive_types: + candidates_by_key.setdefault(skey, []).append((skey, (ckey, cvalue))) + elif isinstance(skey, Marker) and type(skey.schema) in primitive_types: + candidates_by_key.setdefault(skey.schema, []).append((skey, (ckey, cvalue))) + else: + # These are wildcards such as 'int', 'str', 'Remove' and others which should be applied to all keys + additional_candidates.append((skey, (ckey, cvalue))) + + def validate_mapping(path, iterable, out): + required_keys = all_required_keys.copy() + + # Build a map of all provided key-value pairs. + # The type(out) is used to retain ordering in case a ordered + # map type is provided as input. + key_value_map = type(out)() + for key, value in iterable: + key_value_map[key] = value + + # Insert default values for non-existing keys. + for key in all_default_keys: + if not isinstance(key.default, Undefined) and \ + key.schema not in key_value_map: + # A default value has been specified for this missing + # key, insert it. + key_value_map[key.schema] = key.default() + + error = None + errors = [] + for key, value in key_value_map.items(): + key_path = path + [key] + remove_key = False + + # Optimization. Validate against the matching key first, then fallback to the rest + relevant_candidates = itertools.chain(candidates_by_key.get(key, []), additional_candidates) + + # compare each given key/value against all compiled key/values + # schema key, (compiled key, compiled value) + for skey, (ckey, cvalue) in relevant_candidates: + try: + new_key = ckey(key_path, key) + except er.Invalid as e: + if len(e.path) > len(key_path): + raise + if not error or len(e.path) > len(error.path): + error = e + continue + # Backtracking is not performed once a key is selected, so if + # the value is invalid we immediately throw an exception. + exception_errors = [] + # check if the key is marked for removal + is_remove = new_key is Remove + try: + cval = cvalue(key_path, value) + # include if it's not marked for removal + if not is_remove: + out[new_key] = cval + else: + remove_key = True + continue + except er.MultipleInvalid as e: + exception_errors.extend(e.errors) + except er.Invalid as e: + exception_errors.append(e) + + if exception_errors: + if is_remove or remove_key: + continue + for err in exception_errors: + if len(err.path) <= len(key_path): + err.error_type = invalid_msg + errors.append(err) + # If there is a validation error for a required + # key, this means that the key was provided. + # Discard the required key so it does not + # create an additional, noisy exception. + required_keys.discard(skey) + break + + # Key and value okay, mark as found in case it was + # a Required() field. + required_keys.discard(skey) + + break + else: + if remove_key: + # remove key + continue + elif self.extra == ALLOW_EXTRA: + out[key] = value + elif self.extra != REMOVE_EXTRA: + errors.append(er.Invalid('extra keys not allowed', key_path)) + # else REMOVE_EXTRA: ignore the key so it's removed from output + + # for any required keys left that weren't found and don't have defaults: + for key in required_keys: + msg = key.msg if hasattr(key, 'msg') and key.msg else 'required key not provided' + errors.append(er.RequiredFieldInvalid(msg, path + [key])) + if errors: + raise er.MultipleInvalid(errors) + + return out + + return validate_mapping + + def _compile_object(self, schema): + """Validate an object. + + Has the same behavior as dictionary validator but work with object + attributes. + + For example: + + >>> class Structure(object): + ... def __init__(self, one=None, three=None): + ... self.one = one + ... self.three = three + ... + >>> validate = Schema(Object({'one': 'two', 'three': 'four'}, cls=Structure)) + >>> with raises(er.MultipleInvalid, "not a valid value for object value @ data['one']"): + ... validate(Structure(one='three')) + + """ + base_validate = self._compile_mapping( + schema, invalid_msg='object value') + + def validate_object(path, data): + if schema.cls is not UNDEFINED and not isinstance(data, schema.cls): + raise er.ObjectInvalid('expected a {0!r}'.format(schema.cls), path) + iterable = _iterate_object(data) + iterable = ifilter(lambda item: item[1] is not None, iterable) + out = base_validate(path, iterable, {}) + return type(data)(**out) + + return validate_object + + def _compile_dict(self, schema): + """Validate a dictionary. + + A dictionary schema can contain a set of values, or at most one + validator function/type. + + A dictionary schema will only validate a dictionary: + + >>> validate = Schema({}) + >>> with raises(er.MultipleInvalid, 'expected a dictionary'): + ... validate([]) + + An invalid dictionary value: + + >>> validate = Schema({'one': 'two', 'three': 'four'}) + >>> with raises(er.MultipleInvalid, "not a valid value for dictionary value @ data['one']"): + ... validate({'one': 'three'}) + + An invalid key: + + >>> with raises(er.MultipleInvalid, "extra keys not allowed @ data['two']"): + ... validate({'two': 'three'}) + + + Validation function, in this case the "int" type: + + >>> validate = Schema({'one': 'two', 'three': 'four', int: str}) + + Valid integer input: + + >>> validate({10: 'twenty'}) + {10: 'twenty'} + + By default, a "type" in the schema (in this case "int") will be used + purely to validate that the corresponding value is of that type. It + will not Coerce the value: + + >>> with raises(er.MultipleInvalid, "extra keys not allowed @ data['10']"): + ... validate({'10': 'twenty'}) + + Wrap them in the Coerce() function to achieve this: + >>> from voluptuous import Coerce + >>> validate = Schema({'one': 'two', 'three': 'four', + ... Coerce(int): str}) + >>> validate({'10': 'twenty'}) + {10: 'twenty'} + + Custom message for required key + + >>> validate = Schema({Required('one', 'required'): 'two'}) + >>> with raises(er.MultipleInvalid, "required @ data['one']"): + ... validate({}) + + (This is to avoid unexpected surprises.) + + Multiple errors for nested field in a dict: + + >>> validate = Schema({ + ... 'adict': { + ... 'strfield': str, + ... 'intfield': int + ... } + ... }) + >>> try: + ... validate({ + ... 'adict': { + ... 'strfield': 123, + ... 'intfield': 'one' + ... } + ... }) + ... except er.MultipleInvalid as e: + ... print(sorted(str(i) for i in e.errors)) # doctest: +NORMALIZE_WHITESPACE + ["expected int for dictionary value @ data['adict']['intfield']", + "expected str for dictionary value @ data['adict']['strfield']"] + + """ + base_validate = self._compile_mapping( + schema, invalid_msg='dictionary value') + + groups_of_exclusion = {} + groups_of_inclusion = {} + for node in schema: + if isinstance(node, Exclusive): + g = groups_of_exclusion.setdefault(node.group_of_exclusion, []) + g.append(node) + elif isinstance(node, Inclusive): + g = groups_of_inclusion.setdefault(node.group_of_inclusion, []) + g.append(node) + + def validate_dict(path, data): + if not isinstance(data, dict): + raise er.DictInvalid('expected a dictionary', path) + + errors = [] + for label, group in groups_of_exclusion.items(): + exists = False + for exclusive in group: + if exclusive.schema in data: + if exists: + msg = exclusive.msg if hasattr(exclusive, 'msg') and exclusive.msg else \ + "two or more values in the same group of exclusion '%s'" % label + next_path = path + [VirtualPathComponent(label)] + errors.append(er.ExclusiveInvalid(msg, next_path)) + break + exists = True + + if errors: + raise er.MultipleInvalid(errors) + + for label, group in groups_of_inclusion.items(): + included = [node.schema in data for node in group] + if any(included) and not all(included): + msg = "some but not all values in the same group of inclusion '%s'" % label + for g in group: + if hasattr(g, 'msg') and g.msg: + msg = g.msg + break + next_path = path + [VirtualPathComponent(label)] + errors.append(er.InclusiveInvalid(msg, next_path)) + break + + if errors: + raise er.MultipleInvalid(errors) + + out = data.__class__() + return base_validate(path, iteritems(data), out) + + return validate_dict + + def _compile_sequence(self, schema, seq_type): + """Validate a sequence type. + + This is a sequence of valid values or validators tried in order. + + >>> validator = Schema(['one', 'two', int]) + >>> validator(['one']) + ['one'] + >>> with raises(er.MultipleInvalid, 'expected int @ data[0]'): + ... validator([3.5]) + >>> validator([1]) + [1] + """ + _compiled = [self._compile(s) for s in schema] + seq_type_name = seq_type.__name__ + + def validate_sequence(path, data): + if not isinstance(data, seq_type): + raise er.SequenceTypeInvalid('expected a %s' % seq_type_name, path) + + # Empty seq schema, reject any data. + if not schema: + if data: + raise er.MultipleInvalid([ + er.ValueInvalid('not a valid value', path if path else data) + ]) + return data + + out = [] + invalid = None + errors = [] + index_path = UNDEFINED + for i, value in enumerate(data): + index_path = path + [i] + invalid = None + for validate in _compiled: + try: + cval = validate(index_path, value) + if cval is not Remove: # do not include Remove values + out.append(cval) + break + except er.Invalid as e: + if len(e.path) > len(index_path): + raise + invalid = e + else: + errors.append(invalid) + if errors: + raise er.MultipleInvalid(errors) + + if _isnamedtuple(data): + return type(data)(*out) + else: + return type(data)(out) + + return validate_sequence + + def _compile_tuple(self, schema): + """Validate a tuple. + + A tuple is a sequence of valid values or validators tried in order. + + >>> validator = Schema(('one', 'two', int)) + >>> validator(('one',)) + ('one',) + >>> with raises(er.MultipleInvalid, 'expected int @ data[0]'): + ... validator((3.5,)) + >>> validator((1,)) + (1,) + """ + return self._compile_sequence(schema, tuple) + + def _compile_list(self, schema): + """Validate a list. + + A list is a sequence of valid values or validators tried in order. + + >>> validator = Schema(['one', 'two', int]) + >>> validator(['one']) + ['one'] + >>> with raises(er.MultipleInvalid, 'expected int @ data[0]'): + ... validator([3.5]) + >>> validator([1]) + [1] + """ + return self._compile_sequence(schema, list) + + def _compile_set(self, schema): + """Validate a set. + + A set is an unordered collection of unique elements. + + >>> validator = Schema({int}) + >>> validator(set([42])) == set([42]) + True + >>> with raises(er.Invalid, 'expected a set'): + ... validator(42) + >>> with raises(er.MultipleInvalid, 'invalid value in set'): + ... validator(set(['a'])) + """ + type_ = type(schema) + type_name = type_.__name__ + + def validate_set(path, data): + if not isinstance(data, type_): + raise er.Invalid('expected a %s' % type_name, path) + + _compiled = [self._compile(s) for s in schema] + errors = [] + for value in data: + for validate in _compiled: + try: + validate(path, value) + break + except er.Invalid: + pass + else: + invalid = er.Invalid('invalid value in %s' % type_name, path) + errors.append(invalid) + + if errors: + raise er.MultipleInvalid(errors) + + return data + + return validate_set + + def extend(self, schema, required=None, extra=None): + """Create a new `Schema` by merging this and the provided `schema`. + + Neither this `Schema` nor the provided `schema` are modified. The + resulting `Schema` inherits the `required` and `extra` parameters of + this, unless overridden. + + Both schemas must be dictionary-based. + + :param schema: dictionary to extend this `Schema` with + :param required: if set, overrides `required` of this `Schema` + :param extra: if set, overrides `extra` of this `Schema` + """ + + assert type(self.schema) == dict and type(schema) == dict, 'Both schemas must be dictionary-based' + + result = self.schema.copy() + + # returns the key that may have been passed as an argument to Marker constructor + def key_literal(key): + return (key.schema if isinstance(key, Marker) else key) + + # build a map that takes the key literals to the needed objects + # literal -> Required|Optional|literal + result_key_map = dict((key_literal(key), key) for key in result) + + # for each item in the extension schema, replace duplicates + # or add new keys + for key, value in iteritems(schema): + + # if the key is already in the dictionary, we need to replace it + # transform key to literal before checking presence + if key_literal(key) in result_key_map: + + result_key = result_key_map[key_literal(key)] + result_value = result[result_key] + + # if both are dictionaries, we need to extend recursively + # create the new extended sub schema, then remove the old key and add the new one + if type(result_value) == dict and type(value) == dict: + new_value = Schema(result_value).extend(value).schema + del result[result_key] + result[key] = new_value + # one or the other or both are not sub-schemas, simple replacement is fine + # remove old key and add new one + else: + del result[result_key] + result[key] = value + + # key is new and can simply be added + else: + result[key] = value + + # recompile and send old object + result_cls = type(self) + result_required = (required if required is not None else self.required) + result_extra = (extra if extra is not None else self.extra) + return result_cls(result, required=result_required, extra=result_extra) + + +def _compile_scalar(schema): + """A scalar value. + + The schema can either be a value or a type. + + >>> _compile_scalar(int)([], 1) + 1 + >>> with raises(er.Invalid, 'expected float'): + ... _compile_scalar(float)([], '1') + + Callables have + >>> _compile_scalar(lambda v: float(v))([], '1') + 1.0 + + As a convenience, ValueError's are trapped: + + >>> with raises(er.Invalid, 'not a valid value'): + ... _compile_scalar(lambda v: float(v))([], 'a') + """ + if inspect.isclass(schema): + def validate_instance(path, data): + if isinstance(data, schema): + return data + else: + msg = 'expected %s' % schema.__name__ + raise er.TypeInvalid(msg, path) + + return validate_instance + + if callable(schema): + def validate_callable(path, data): + try: + return schema(data) + except ValueError: + raise er.ValueInvalid('not a valid value', path) + except er.Invalid as e: + e.prepend(path) + raise + + return validate_callable + + def validate_value(path, data): + if data != schema: + raise er.ScalarInvalid('not a valid value', path) + return data + + return validate_value + + +def _compile_itemsort(): + '''return sort function of mappings''' + + def is_extra(key_): + return key_ is Extra + + def is_remove(key_): + return isinstance(key_, Remove) + + def is_marker(key_): + return isinstance(key_, Marker) + + def is_type(key_): + return inspect.isclass(key_) + + def is_callable(key_): + return callable(key_) + + # priority list for map sorting (in order of checking) + # We want Extra to match last, because it's a catch-all. On the other hand, + # Remove markers should match first (since invalid values will not + # raise an Error, instead the validator will check if other schemas match + # the same value). + priority = [(1, is_remove), # Remove highest priority after values + (2, is_marker), # then other Markers + (4, is_type), # types/classes lowest before Extra + (3, is_callable), # callables after markers + (5, is_extra)] # Extra lowest priority + + def item_priority(item_): + key_ = item_[0] + for i, check_ in priority: + if check_(key_): + return i + # values have hightest priorities + return 0 + + return item_priority + + +_sort_item = _compile_itemsort() + + +def _iterate_mapping_candidates(schema): + """Iterate over schema in a meaningful order.""" + # Without this, Extra might appear first in the iterator, and fail to + # validate a key even though it's a Required that has its own validation, + # generating a false positive. + return sorted(iteritems(schema), key=_sort_item) + + +def _iterate_object(obj): + """Return iterator over object attributes. Respect objects with + defined __slots__. + + """ + d = {} + try: + d = vars(obj) + except TypeError: + # maybe we have named tuple here? + if hasattr(obj, '_asdict'): + d = obj._asdict() + for item in iteritems(d): + yield item + try: + slots = obj.__slots__ + except AttributeError: + pass + else: + for key in slots: + if key != '__dict__': + yield (key, getattr(obj, key)) + + +class Msg(object): + """Report a user-friendly message if a schema fails to validate. + + >>> validate = Schema( + ... Msg(['one', 'two', int], + ... 'should be one of "one", "two" or an integer')) + >>> with raises(er.MultipleInvalid, 'should be one of "one", "two" or an integer'): + ... validate(['three']) + + Messages are only applied to invalid direct descendants of the schema: + + >>> validate = Schema(Msg([['one', 'two', int]], 'not okay!')) + >>> with raises(er.MultipleInvalid, 'expected int @ data[0][0]'): + ... validate([['three']]) + + The type which is thrown can be overridden but needs to be a subclass of Invalid + + >>> with raises(er.SchemaError, 'Msg can only use subclases of Invalid as custom class'): + ... validate = Schema(Msg([int], 'should be int', cls=KeyError)) + + If you do use a subclass of Invalid, that error will be thrown (wrapped in a MultipleInvalid) + + >>> validate = Schema(Msg([['one', 'two', int]], 'not okay!', cls=er.RangeInvalid)) + >>> try: + ... validate(['three']) + ... except er.MultipleInvalid as e: + ... assert isinstance(e.errors[0], er.RangeInvalid) + """ + + def __init__(self, schema, msg, cls=None): + if cls and not issubclass(cls, er.Invalid): + raise er.SchemaError("Msg can only use subclases of" + " Invalid as custom class") + self._schema = schema + self.schema = Schema(schema) + self.msg = msg + self.cls = cls + + def __call__(self, v): + try: + return self.schema(v) + except er.Invalid as e: + if len(e.path) > 1: + raise e + else: + raise (self.cls or er.Invalid)(self.msg) + + def __repr__(self): + return 'Msg(%s, %s, cls=%s)' % (self._schema, self.msg, self.cls) + + +class Object(dict): + """Indicate that we should work with attributes, not keys.""" + + def __init__(self, schema, cls=UNDEFINED): + self.cls = cls + super(Object, self).__init__(schema) + + +class VirtualPathComponent(str): + def __str__(self): + return '<' + self + '>' + + def __repr__(self): + return self.__str__() + + +# Markers.py + + +class Marker(object): + """Mark nodes for special treatment.""" + + def __init__(self, schema_, msg=None, description=None): + self.schema = schema_ + self._schema = Schema(schema_) + self.msg = msg + self.description = description + + def __call__(self, v): + try: + return self._schema(v) + except er.Invalid as e: + if not self.msg or len(e.path) > 1: + raise + raise er.Invalid(self.msg) + + def __str__(self): + return str(self.schema) + + def __repr__(self): + return repr(self.schema) + + def __lt__(self, other): + if isinstance(other, Marker): + return self.schema < other.schema + return self.schema < other + + def __hash__(self): + return hash(self.schema) + + def __eq__(self, other): + return self.schema == other + + def __ne__(self, other): + return not(self.schema == other) + + +class Optional(Marker): + """Mark a node in the schema as optional, and optionally provide a default + + >>> schema = Schema({Optional('key'): str}) + >>> schema({}) + {} + >>> schema = Schema({Optional('key', default='value'): str}) + >>> schema({}) + {'key': 'value'} + >>> schema = Schema({Optional('key', default=list): list}) + >>> schema({}) + {'key': []} + + If 'required' flag is set for an entire schema, optional keys aren't required + + >>> schema = Schema({ + ... Optional('key'): str, + ... 'key2': str + ... }, required=True) + >>> schema({'key2':'value'}) + {'key2': 'value'} + """ + + def __init__(self, schema, msg=None, default=UNDEFINED, description=None): + super(Optional, self).__init__(schema, msg=msg, + description=description) + self.default = default_factory(default) + + +class Exclusive(Optional): + """Mark a node in the schema as exclusive. + + Exclusive keys inherited from Optional: + + >>> schema = Schema({Exclusive('alpha', 'angles'): int, Exclusive('beta', 'angles'): int}) + >>> schema({'alpha': 30}) + {'alpha': 30} + + Keys inside a same group of exclusion cannot be together, it only makes sense for dictionaries: + + >>> with raises(er.MultipleInvalid, "two or more values in the same group of exclusion 'angles' @ data[<angles>]"): + ... schema({'alpha': 30, 'beta': 45}) + + For example, API can provides multiple types of authentication, but only one works in the same time: + + >>> msg = 'Please, use only one type of authentication at the same time.' + >>> schema = Schema({ + ... Exclusive('classic', 'auth', msg=msg):{ + ... Required('email'): basestring, + ... Required('password'): basestring + ... }, + ... Exclusive('internal', 'auth', msg=msg):{ + ... Required('secret_key'): basestring + ... }, + ... Exclusive('social', 'auth', msg=msg):{ + ... Required('social_network'): basestring, + ... Required('token'): basestring + ... } + ... }) + + >>> with raises(er.MultipleInvalid, "Please, use only one type of authentication at the same time. @ data[<auth>]"): + ... schema({'classic': {'email': 'foo@example.com', 'password': 'bar'}, + ... 'social': {'social_network': 'barfoo', 'token': 'tEMp'}}) + """ + + def __init__(self, schema, group_of_exclusion, msg=None, description=None): + super(Exclusive, self).__init__(schema, msg=msg, + description=description) + self.group_of_exclusion = group_of_exclusion + + +class Inclusive(Optional): + """ Mark a node in the schema as inclusive. + + Inclusive keys inherited from Optional: + + >>> schema = Schema({ + ... Inclusive('filename', 'file'): str, + ... Inclusive('mimetype', 'file'): str + ... }) + >>> data = {'filename': 'dog.jpg', 'mimetype': 'image/jpeg'} + >>> data == schema(data) + True + + Keys inside a same group of inclusive must exist together, it only makes sense for dictionaries: + + >>> with raises(er.MultipleInvalid, "some but not all values in the same group of inclusion 'file' @ data[<file>]"): + ... schema({'filename': 'dog.jpg'}) + + If none of the keys in the group are present, it is accepted: + + >>> schema({}) + {} + + For example, API can return 'height' and 'width' together, but not separately. + + >>> msg = "Height and width must exist together" + >>> schema = Schema({ + ... Inclusive('height', 'size', msg=msg): int, + ... Inclusive('width', 'size', msg=msg): int + ... }) + + >>> with raises(er.MultipleInvalid, msg + " @ data[<size>]"): + ... schema({'height': 100}) + + >>> with raises(er.MultipleInvalid, msg + " @ data[<size>]"): + ... schema({'width': 100}) + + >>> data = {'height': 100, 'width': 100} + >>> data == schema(data) + True + """ + + def __init__(self, schema, group_of_inclusion, + msg=None, description=None, default=UNDEFINED): + super(Inclusive, self).__init__(schema, msg=msg, + default=default, + description=description) + self.group_of_inclusion = group_of_inclusion + + +class Required(Marker): + """Mark a node in the schema as being required, and optionally provide a default value. + + >>> schema = Schema({Required('key'): str}) + >>> with raises(er.MultipleInvalid, "required key not provided @ data['key']"): + ... schema({}) + + >>> schema = Schema({Required('key', default='value'): str}) + >>> schema({}) + {'key': 'value'} + >>> schema = Schema({Required('key', default=list): list}) + >>> schema({}) + {'key': []} + """ + + def __init__(self, schema, msg=None, default=UNDEFINED, description=None): + super(Required, self).__init__(schema, msg=msg, + description=description) + self.default = default_factory(default) + + +class Remove(Marker): + """Mark a node in the schema to be removed and excluded from the validated + output. Keys that fail validation will not raise ``Invalid``. Instead, these + keys will be treated as extras. + + >>> schema = Schema({str: int, Remove(int): str}) + >>> with raises(er.MultipleInvalid, "extra keys not allowed @ data[1]"): + ... schema({'keep': 1, 1: 1.0}) + >>> schema({1: 'red', 'red': 1, 2: 'green'}) + {'red': 1} + >>> schema = Schema([int, Remove(float), Extra]) + >>> schema([1, 2, 3, 4.0, 5, 6.0, '7']) + [1, 2, 3, 5, '7'] + """ + + def __call__(self, v): + super(Remove, self).__call__(v) + return self.__class__ + + def __repr__(self): + return "Remove(%r)" % (self.schema,) + + def __hash__(self): + return object.__hash__(self) + + +def message(default=None, cls=None): + """Convenience decorator to allow functions to provide a message. + + Set a default message: + + >>> @message('not an integer') + ... def isint(v): + ... return int(v) + + >>> validate = Schema(isint()) + >>> with raises(er.MultipleInvalid, 'not an integer'): + ... validate('a') + + The message can be overridden on a per validator basis: + + >>> validate = Schema(isint('bad')) + >>> with raises(er.MultipleInvalid, 'bad'): + ... validate('a') + + The class thrown too: + + >>> class IntegerInvalid(er.Invalid): pass + >>> validate = Schema(isint('bad', clsoverride=IntegerInvalid)) + >>> try: + ... validate('a') + ... except er.MultipleInvalid as e: + ... assert isinstance(e.errors[0], IntegerInvalid) + """ + if cls and not issubclass(cls, er.Invalid): + raise er.SchemaError("message can only use subclases of Invalid as custom class") + + def decorator(f): + @wraps(f) + def check(msg=None, clsoverride=None): + @wraps(f) + def wrapper(*args, **kwargs): + try: + return f(*args, **kwargs) + except ValueError: + raise (clsoverride or cls or er.ValueInvalid)(msg or default or 'invalid value') + + return wrapper + + return check + + return decorator + + +def _args_to_dict(func, args): + """Returns argument names as values as key-value pairs.""" + if sys.version_info >= (3, 0): + arg_count = func.__code__.co_argcount + arg_names = func.__code__.co_varnames[:arg_count] + else: + arg_count = func.func_code.co_argcount + arg_names = func.func_code.co_varnames[:arg_count] + + arg_value_list = list(args) + arguments = dict((arg_name, arg_value_list[i]) + for i, arg_name in enumerate(arg_names) + if i < len(arg_value_list)) + return arguments + + +def _merge_args_with_kwargs(args_dict, kwargs_dict): + """Merge args with kwargs.""" + ret = args_dict.copy() + ret.update(kwargs_dict) + return ret + + +def validate(*a, **kw): + """Decorator for validating arguments of a function against a given schema. + + Set restrictions for arguments: + + >>> @validate(arg1=int, arg2=int) + ... def foo(arg1, arg2): + ... return arg1 * arg2 + + Set restriction for returned value: + + >>> @validate(arg=int, __return__=int) + ... def bar(arg1): + ... return arg1 * 2 + + """ + RETURNS_KEY = '__return__' + + def validate_schema_decorator(func): + + returns_defined = False + returns = None + + schema_args_dict = _args_to_dict(func, a) + schema_arguments = _merge_args_with_kwargs(schema_args_dict, kw) + + if RETURNS_KEY in schema_arguments: + returns_defined = True + returns = schema_arguments[RETURNS_KEY] + del schema_arguments[RETURNS_KEY] + + input_schema = (Schema(schema_arguments, extra=ALLOW_EXTRA) + if len(schema_arguments) != 0 else lambda x: x) + output_schema = Schema(returns) if returns_defined else lambda x: x + + @wraps(func) + def func_wrapper(*args, **kwargs): + args_dict = _args_to_dict(func, args) + arguments = _merge_args_with_kwargs(args_dict, kwargs) + validated_arguments = input_schema(arguments) + output = func(**validated_arguments) + return output_schema(output) + + return func_wrapper + + return validate_schema_decorator |