# -*-: coding utf-8 -*-
""" This module contains the error-related constants and classes. """
from __future__ import absolute_import
import sys
from collections import defaultdict, namedtuple
from copy import copy, deepcopy
from functools import wraps
from pprint import pformat
from cerberus.platform import MutableMapping
from cerberus.utils import compare_paths_lt, quote_string
ErrorDefinition = namedtuple('ErrorDefinition', 'code, rule')
"""
This class is used to define possible errors. Each distinguishable error is
defined by a *unique* error ``code`` as integer and the ``rule`` that can
cause it as string.
The instances' names do not contain a common prefix as they are supposed to be
referenced within the module namespace, e.g. ``errors.CUSTOM``.
"""
# custom
CUSTOM = ErrorDefinition(0x00, None)
# existence
DOCUMENT_MISSING = ErrorDefinition(0x01, None) # issues/141
DOCUMENT_MISSING = "document is missing"
REQUIRED_FIELD = ErrorDefinition(0x02, 'required')
UNKNOWN_FIELD = ErrorDefinition(0x03, None)
DEPENDENCIES_FIELD = ErrorDefinition(0x04, 'dependencies')
DEPENDENCIES_FIELD_VALUE = ErrorDefinition(0x05, 'dependencies')
EXCLUDES_FIELD = ErrorDefinition(0x06, 'excludes')
# shape
DOCUMENT_FORMAT = ErrorDefinition(0x21, None) # issues/141
DOCUMENT_FORMAT = "'{0}' is not a document, must be a dict"
EMPTY_NOT_ALLOWED = ErrorDefinition(0x22, 'empty')
NOT_NULLABLE = ErrorDefinition(0x23, 'nullable')
BAD_TYPE = ErrorDefinition(0x24, 'type')
BAD_TYPE_FOR_SCHEMA = ErrorDefinition(0x25, 'schema')
ITEMS_LENGTH = ErrorDefinition(0x26, 'items')
MIN_LENGTH = ErrorDefinition(0x27, 'minlength')
MAX_LENGTH = ErrorDefinition(0x28, 'maxlength')
# color
REGEX_MISMATCH = ErrorDefinition(0x41, 'regex')
MIN_VALUE = ErrorDefinition(0x42, 'min')
MAX_VALUE = ErrorDefinition(0x43, 'max')
UNALLOWED_VALUE = ErrorDefinition(0x44, 'allowed')
UNALLOWED_VALUES = ErrorDefinition(0x45, 'allowed')
FORBIDDEN_VALUE = ErrorDefinition(0x46, 'forbidden')
FORBIDDEN_VALUES = ErrorDefinition(0x47, 'forbidden')
MISSING_MEMBERS = ErrorDefinition(0x48, 'contains')
# other
NORMALIZATION = ErrorDefinition(0x60, None)
COERCION_FAILED = ErrorDefinition(0x61, 'coerce')
RENAMING_FAILED = ErrorDefinition(0x62, 'rename_handler')
READONLY_FIELD = ErrorDefinition(0x63, 'readonly')
SETTING_DEFAULT_FAILED = ErrorDefinition(0x64, 'default_setter')
# groups
ERROR_GROUP = ErrorDefinition(0x80, None)
MAPPING_SCHEMA = ErrorDefinition(0x81, 'schema')
SEQUENCE_SCHEMA = ErrorDefinition(0x82, 'schema')
# TODO remove KEYSCHEMA AND VALUESCHEMA with next major release
KEYSRULES = KEYSCHEMA = ErrorDefinition(0x83, 'keysrules')
VALUESRULES = VALUESCHEMA = ErrorDefinition(0x84, 'valuesrules')
BAD_ITEMS = ErrorDefinition(0x8F, 'items')
LOGICAL = ErrorDefinition(0x90, None)
NONEOF = ErrorDefinition(0x91, 'noneof')
ONEOF = ErrorDefinition(0x92, 'oneof')
ANYOF = ErrorDefinition(0x93, 'anyof')
ALLOF = ErrorDefinition(0x94, 'allof')
""" SchemaError messages """
SCHEMA_ERROR_DEFINITION_TYPE = "schema definition for field '{0}' must be a dict"
SCHEMA_ERROR_MISSING = "validation schema missing"
""" Error representations """
class ValidationError(object):
"""A simple class to store and query basic error information."""
def __init__(self, document_path, schema_path, code, rule, constraint, value, info):
self.document_path = document_path
""" The path to the field within the document that caused the error.
Type: :class:`tuple` """
self.schema_path = schema_path
""" The path to the rule within the schema that caused the error.
Type: :class:`tuple` """
self.code = code
""" The error's identifier code. Type: :class:`int` """
self.rule = rule
""" The rule that failed. Type: `string` """
self.constraint = constraint
""" The constraint that failed. """
self.value = value
""" The value that failed. """
self.info = info
""" May hold additional information about the error.
Type: :class:`tuple` """
def __eq__(self, other):
"""Assumes the errors relate to the same document and schema."""
return hash(self) == hash(other)
def __hash__(self):
"""Expects that all other properties are transitively determined."""
return hash(self.document_path) ^ hash(self.schema_path) ^ hash(self.code)
def __lt__(self, other):
if self.document_path != other.document_path:
return compare_paths_lt(self.document_path, other.document_path)
else:
return compare_paths_lt(self.schema_path, other.schema_path)
def __repr__(self):
return (
"{class_name} @ {memptr} ( "
"document_path={document_path},"
"schema_path={schema_path},"
"code={code},"
"constraint={constraint},"
"value={value},"
"info={info} )".format(
class_name=self.__class__.__name__,
memptr=hex(id(self)), # noqa: E501
document_path=self.document_path,
schema_path=self.schema_path,
code=hex(self.code),
constraint=quote_string(self.constraint),
value=quote_string(self.value),
info=self.info,
)
)
@property
def child_errors(self):
"""
A list that contains the individual errors of a bulk validation error.
"""
return self.info[0] if self.is_group_error else None
@property
def definitions_errors(self):
"""
Dictionary with errors of an \*of-rule mapped to the index of the definition it
occurred in. Returns :obj:`None` if not applicable.
"""
if not self.is_logic_error:
return None
result = defaultdict(list)
for error in self.child_errors:
i = error.schema_path[len(self.schema_path)]
result[i].append(error)
return result
@property
def field(self):
"""Field of the contextual mapping, possibly :obj:`None`."""
if self.document_path:
return self.document_path[-1]
else:
return None
@property
def is_group_error(self):
"""``True`` for errors of bulk validations."""
return bool(self.code & ERROR_GROUP.code)
@property
def is_logic_error(self):
"""
``True`` for validation errors against different schemas with \*of-rules.
"""
return bool(self.code & LOGICAL.code - ERROR_GROUP.code)
@property
def is_normalization_error(self):
"""``True`` for normalization errors."""
return bool(self.code & NORMALIZATION.code)
class ErrorList(list):
"""
A list for :class:`~cerberus.errors.ValidationError` instances that can be queried
with the ``in`` keyword for a particular :class:`~cerberus.errors.ErrorDefinition`.
"""
def __contains__(self, error_definition):
if not isinstance(error_definition, ErrorDefinition):
raise TypeError
wanted_code = error_definition.code
return any(x.code == wanted_code for x in self)
class ErrorTreeNode(MutableMapping):
__slots__ = ('descendants', 'errors', 'parent_node', 'path', 'tree_root')
def __init__(self, path, parent_node):
self.parent_node = parent_node
self.tree_root = self.parent_node.tree_root
self.path = path[: self.parent_node.depth + 1]
self.errors = ErrorList()
self.descendants = {}
def __contains__(self, item):
if isinstance(item, ErrorDefinition):
return item in self.errors
else:
return item in self.descendants
def __delitem__(self, key):
del self.descendants[key]
def __iter__(self):
return iter(self.errors)
def __getitem__(self, item):
if isinstance(item, ErrorDefinition):
for error in self.errors:
if item.code == error.code:
return error
return None
else:
return self.descendants.get(item)
def __len__(self):
return len(self.errors)
def __repr__(self):
return self.__str__()
def __setitem__(self, key, value):
self.descendants[key] = value
def __str__(self):
return str(self.errors) + ',' + str(self.descendants)
@property
def depth(self):
return len(self.path)
@property
def tree_type(self):
return self.tree_root.tree_type
def add(self, error):
error_path = self._path_of_(error)
key = error_path[self.depth]
if key not in self.descendants:
self[key] = ErrorTreeNode(error_path, self)
node = self[key]
if len(error_path) == self.depth + 1:
node.errors.append(error)
node.errors.sort()
if error.is_group_error:
for child_error in error.child_errors:
self.tree_root.add(child_error)
else:
node.add(error)
def _path_of_(self, error):
return getattr(error, self.tree_type + '_path')
class ErrorTree(ErrorTreeNode):
"""
Base class for :class:`~cerberus.errors.DocumentErrorTree` and
:class:`~cerberus.errors.SchemaErrorTree`.
"""
def __init__(self, errors=()):
self.parent_node = None
self.tree_root = self
self.path = ()
self.errors = ErrorList()
self.descendants = {}
for error in errors:
self.add(error)
def add(self, error):
"""
Add an error to the tree.
:param error: :class:`~cerberus.errors.ValidationError`
"""
if not self._path_of_(error):
self.errors.append(error)
self.errors.sort()
else:
super(ErrorTree, self).add(error)
def fetch_errors_from(self, path):
"""
Returns all errors for a particular path.
:param path: :class:`tuple` of :term:`hashable` s.
:rtype: :class:`~cerberus.errors.ErrorList`
"""
node = self.fetch_node_from(path)
if node is not None:
return node.errors
else:
return ErrorList()
def fetch_node_from(self, path):
"""
Returns a node for a path.
:param path: Tuple of :term:`hashable` s.
:rtype: :class:`~cerberus.errors.ErrorTreeNode` or :obj:`None`
"""
context = self
for key in path:
context = context[key]
if context is None:
break
return context
class DocumentErrorTree(ErrorTree):
"""
Implements a dict-like class to query errors by indexes following the structure of a
validated document.
"""
tree_type = 'document'
class SchemaErrorTree(ErrorTree):
"""
Implements a dict-like class to query errors by indexes following the structure of
the used schema.
"""
tree_type = 'schema'
class BaseErrorHandler(object):
"""Base class for all error handlers.
Subclasses are identified as error-handlers with an instance-test."""
def __init__(self, *args, **kwargs):
"""Optionally initialize a new instance."""
pass
def __call__(self, errors):
"""
Returns errors in a handler-specific format.
:param errors: An object containing the errors.
:type errors: :term:`iterable` of
:class:`~cerberus.errors.ValidationError` instances or a
:class:`~cerberus.Validator` instance
"""
raise NotImplementedError
def __iter__(self):
"""Be a superhero and implement an iterator over errors."""
raise NotImplementedError
def add(self, error):
"""
Add an error to the errors' container object of a handler.
:param error: The error to add.
:type error: :class:`~cerberus.errors.ValidationError`
"""
raise NotImplementedError
def emit(self, error):
"""
Optionally emits an error in the handler's format to a stream. Or light a LED,
or even shut down a power plant.
:param error: The error to emit.
:type error: :class:`~cerberus.errors.ValidationError`
"""
pass
def end(self, validator):
"""
Gets called when a validation ends.
:param validator: The calling validator.
:type validator: :class:`~cerberus.Validator`
"""
pass
def extend(self, errors):
"""
Adds all errors to the handler's container object.
:param errors: The errors to add.
:type errors: :term:`iterable` of
:class:`~cerberus.errors.ValidationError` instances
"""
for error in errors:
self.add(error)
def start(self, validator):
"""
Gets called when a validation starts.
:param validator: The calling validator.
:type validator: :class:`~cerberus.Validator`
"""
pass
class ToyErrorHandler(BaseErrorHandler):
def __call__(self, *args, **kwargs):
raise RuntimeError('This is not supposed to happen.')
def clear(self):
pass
def encode_unicode(f):
"""Cerberus error messages expect regular binary strings.
If unicode is used in a ValidationError message can't be printed.
This decorator ensures that if legacy Python is used unicode
strings are encoded before passing to a function.
"""
@wraps(f)
def wrapped(obj, error):
def _encode(value):
"""Helper encoding unicode strings into binary utf-8"""
if isinstance(value, unicode): # noqa: F821
return value.encode('utf-8')
return value
error = copy(error)
error.document_path = _encode(error.document_path)
error.schema_path = _encode(error.schema_path)
error.constraint = _encode(error.constraint)
error.value = _encode(error.value)
error.info = _encode(error.info)
return f(obj, error)
return wrapped if sys.version_info < (3,) else f
class BasicErrorHandler(BaseErrorHandler):
"""
Models cerberus' legacy. Returns a :class:`dict`. When mangled through :class:`str`
a pretty-formatted representation of that tree is returned.
"""
messages = {
0x00: "{0}",
0x01: "document is missing",
0x02: "required field",
0x03: "unknown field",
0x04: "field '{0}' is required",
0x05: "depends on these values: {constraint}",
0x06: "{0} must not be present with '{field}'",
0x21: "'{0}' is not a document, must be a dict",
0x22: "empty values not allowed",
0x23: "null value not allowed",
0x24: "must be of {constraint} type",
0x25: "must be of dict type",
0x26: "length of list should be {0}, it is {1}",
0x27: "min length is {constraint}",
0x28: "max length is {constraint}",
0x41: "value does not match regex '{constraint}'",
0x42: "min value is {constraint}",
0x43: "max value is {constraint}",
0x44: "unallowed value {value}",
0x45: "unallowed values {0}",
0x46: "unallowed value {value}",
0x47: "unallowed values {0}",
0x48: "missing members {0}",
0x61: "field '{field}' cannot be coerced: {0}",
0x62: "field '{field}' cannot be renamed: {0}",
0x63: "field is read-only",
0x64: "default value for '{field}' cannot be set: {0}",
0x81: "mapping doesn't validate subschema: {0}",
0x82: "one or more sequence-items don't validate: {0}",
0x83: "one or more keys of a mapping don't validate: {0}",
0x84: "one or more values in a mapping don't validate: {0}",
0x85: "one or more sequence-items don't validate: {0}",
0x91: "one or more definitions validate",
0x92: "none or more than one rule validate",
0x93: "no definitions validate",
0x94: "one or more definitions don't validate",
}
def __init__(self, tree=None):
self.tree = {} if tree is None else tree
def __call__(self, errors):
self.clear()
self.extend(errors)
return self.pretty_tree
def __str__(self):
return pformat(self.pretty_tree)
@property
def pretty_tree(self):
pretty = deepcopy(self.tree)
for field in pretty:
self._purge_empty_dicts(pretty[field])
return pretty
@encode_unicode
def add(self, error):
# Make sure the original error is not altered with
# error paths specific to the handler.
error = deepcopy(error)
self._rewrite_error_path(error)
if error.is_logic_error:
self._insert_logic_error(error)
elif error.is_group_error:
self._insert_group_error(error)
elif error.code in self.messages:
self._insert_error(
error.document_path, self._format_message(error.field, error)
)
def clear(self):
self.tree = {}
def start(self, validator):
self.clear()
def _format_message(self, field, error):
return self.messages[error.code].format(
*error.info, constraint=error.constraint, field=field, value=error.value
)
def _insert_error(self, path, node):
"""
Adds an error or sub-tree to :attr:tree.
:param path: Path to the error.
:type path: Tuple of strings and integers.
:param node: An error message or a sub-tree.
:type node: String or dictionary.
"""
field = path[0]
if len(path) == 1:
if field in self.tree:
subtree = self.tree[field].pop()
self.tree[field] += [node, subtree]
else:
self.tree[field] = [node, {}]
elif len(path) >= 1:
if field not in self.tree:
self.tree[field] = [{}]
subtree = self.tree[field][-1]
if subtree:
new = self.__class__(tree=copy(subtree))
else:
new = self.__class__()
new._insert_error(path[1:], node)
subtree.update(new.tree)
def _insert_group_error(self, error):
for child_error in error.child_errors:
if child_error.is_logic_error:
self._insert_logic_error(child_error)
elif child_error.is_group_error:
self._insert_group_error(child_error)
else:
self._insert_error(
child_error.document_path,
self._format_message(child_error.field, child_error),
)
def _insert_logic_error(self, error):
field = error.field
self._insert_error(error.document_path, self._format_message(field, error))
for definition_errors in error.definitions_errors.values():
for child_error in definition_errors:
if child_error.is_logic_error:
self._insert_logic_error(child_error)
elif child_error.is_group_error:
self._insert_group_error(child_error)
else:
self._insert_error(
child_error.document_path,
self._format_message(field, child_error),
)
def _purge_empty_dicts(self, error_list):
subtree = error_list[-1]
if not error_list[-1]:
error_list.pop()
else:
for key in subtree:
self._purge_empty_dicts(subtree[key])
def _rewrite_error_path(self, error, offset=0):
"""
Recursively rewrites the error path to correctly represent logic errors
"""
if error.is_logic_error:
self._rewrite_logic_error_path(error, offset)
elif error.is_group_error:
self._rewrite_group_error_path(error, offset)
def _rewrite_group_error_path(self, error, offset=0):
child_start = len(error.document_path) - offset
for child_error in error.child_errors:
relative_path = child_error.document_path[child_start:]
child_error.document_path = error.document_path + relative_path
self._rewrite_error_path(child_error, offset)
def _rewrite_logic_error_path(self, error, offset=0):
child_start = len(error.document_path) - offset
for i, definition_errors in error.definitions_errors.items():
if not definition_errors:
continue
nodename = '%s definition %s' % (error.rule, i)
path = error.document_path + (nodename,)
for child_error in definition_errors:
rel_path = child_error.document_path[child_start:]
child_error.document_path = path + rel_path
self._rewrite_error_path(child_error, offset + 1)
class SchemaErrorHandler(BasicErrorHandler):
messages = BasicErrorHandler.messages.copy()
messages[0x03] = "unknown rule"