"""
Proxy field validation
----------------------
Proxied entity fields (attributes) have two representations: one
that appears in the entity (JSON object stored in a dict) and one that is used
by the proxy objects.
client proxy value <--> json entity value
For example, dates are represented as datetime objects in the proxy and as strings
in the JSON entity. Conversion between the two is done via the two conversion methods
of the classes herein.
"""
from __future__ import annotations
import json
import re
from collections.abc import MutableSequence
from datetime import datetime
from typing import Any, Mapping, Optional
from uuid import UUID
from frozendict import deepfreeze
__all__ = [
"FieldValidator",
"AnyField",
"FrozenField",
"EnumeratedField",
"StateField",
"BoolField",
"IntField",
"StrField",
"DateField",
"DateFmtField",
"DictField",
"ListField",
"BasicField",
"ExecStateField",
"UUIDField",
"NameField",
"TagNameField",
"VocabNameField",
]
[docs]
class FieldValidator:
"""Provide simple validation and conversion for entity fields.
A validation is a sequence of checks. Each check can:
- Raise an exception of ValueError
- Apply a value conversion and continue
- Apply a value conversion and terminate
Any function that takes as input a value, and returns a pair (newvalue, done)
where `done` is a boolean, can be used as a check.
At the end of all conversions, if no conversion signaled `done`, the `strict` attribute
is checked. If True, an error is raised, else (the default) conversion succeeds.
"""
def __init__(
self,
*,
strict: bool = False,
nullable: bool = True,
default: Any = ...,
minimum_value: Any = None,
maximum_value: Any = None,
maximum_len: Optional[int] = None,
minimum_len: Optional[int] = None,
):
self.prioritized_checks = []
self.checks = []
self.strict = strict
self.nullable = nullable
if default is not ...:
self.default = default
self.minimum_value = minimum_value
self.maximum_value = maximum_value
self.maximum_len = maximum_len
self.minimum_len = minimum_len
if nullable is not None:
self.add_check(self.check_null, -1)
if minimum_value is not None:
self.add_check(self.check_minimum, 10)
if maximum_value is not None:
self.add_check(self.check_maximum, 12)
if maximum_len is not None or minimum_len is not None:
self.add_check(self.check_length, 20)
[docs]
def add_check(self, check_func, pri: int):
self.prioritized_checks.append((check_func, pri))
self.prioritized_checks.sort(key=lambda p: p[1])
self.checks = [p[0] for p in self.prioritized_checks]
[docs]
def check_null(self, value, **kwargs):
if value is None:
if self.nullable:
return None, True
else:
raise ValueError("None is not allowed")
else:
return value, False
[docs]
def check_length(self, value, **kwargs):
value_len = len(value)
if self.minimum_len is not None and value_len < self.minimum_len:
raise ValueError(
f"The length ({value_len}) is less than the minimum ({self.minimum_len})"
)
if self.maximum_len is not None and value_len > self.maximum_len:
raise ValueError(
f"The length ({value_len}) is greater that the maximum ({self.maximum_len})"
)
return value, False
[docs]
def check_minimum(self, value, **kwargs):
if value < self.minimum_value:
raise ValueError(f"Value ({value}) too low (minimum={self.minimum_value})")
return value, False
[docs]
def check_maximum(self, value, **kwargs):
if value > self.maximum_value:
raise ValueError(f"Value ({value}) too high (maximum={self.maximum_value})")
return value, False
[docs]
def validate(self, value, **kwargs):
done = False
try:
for check in self.checks:
value, done = check(value, **kwargs)
if done:
return value
except ValueError:
raise
except Exception as e:
raise ValueError("Bad value found during validation") from e
if self.strict:
raise ValueError("Validation failed to match input value")
else:
return value
[docs]
def convert_to_proxy(self, value, **kwargs):
raise NotImplementedError()
[docs]
def convert_to_entity(self, value, **kwargs):
raise NotImplementedError()
[docs]
def default_value(self):
raise NotImplementedError
[docs]
def repr_constraints(self):
nn = ["nullable" if self.nullable else "not null"]
if self.minimum_len is not None and self.maximum_len is not None:
nn.append(f"{self.minimum_len} <= length <= {self.maximum_len}")
elif self.minimum_len is not None:
nn.append(f"{self.minimum_len} <= length")
elif self.maximum_len is not None:
nn.append(f"length <= {self.maximum_len}")
if self.minimum_value is not None and self.maximum_value is not None:
nn.append(f"{self.minimum_value} <= value <={self.maximum_value}")
elif self.minimum_value is not None:
nn.append(f"{self.minimum_value} <= value")
elif self.maximum_value is not None:
nn.append(f"value <= {self.maximum_value}")
return nn
[docs]
class AnyField(FieldValidator):
"""A very promiscuous basic validator."""
def __init__(self, repr_type="Any", *args, **kwargs):
super().__init__(*args, **kwargs)
self._repr_type = repr_type
[docs]
def convert_to_proxy(self, value, **kwargs):
return value
[docs]
def convert_to_entity(self, value, **kwargs):
return value
[docs]
def default_value(self):
if hasattr(self, "default"):
return self.default
elif self.nullable:
return None
else:
raise NotImplementedError()
[docs]
def repr_type(self):
return self._repr_type
[docs]
class JSONField(AnyField):
"""A field that accepts any JSON-serializable value."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.add_check(self.check_json, 5)
[docs]
def check_json(self, value, **kwargs):
"""Validator stage for JSONField"""
try:
json.dumps(value) # Check if value is JSON-serializable
except (TypeError, ValueError) as e:
raise ValueError("Value is not JSON-serializable") from e
return value, False
[docs]
def repr_type(self):
return "JSON"
[docs]
def convert_to_proxy(self, value, **kwargs):
return value
[docs]
def convert_to_entity(self, value, **kwargs):
return value
[docs]
class EnumeratedField(AnyField):
"""Fields with a fixed set of legal values.
Subclasses can redefine the VALUES class attribute.
"""
VALUES = []
def __init__(self, *args, **kwargs):
super().__init__(**kwargs)
self.add_check(self.oneof, 5)
[docs]
def oneof(self, value):
if value not in self.VALUES:
raise ValueError(f"Not one of {self.VALUES}")
return value, False
[docs]
def repr_type(self):
return f"OneOf{self.VALUES}"
[docs]
class StateField(EnumeratedField):
VALUES = ["active", "deleted"]
def __init__(self):
super().__init__(nullable=False)
[docs]
class ExecStateField(EnumeratedField):
VALUES = ["running", "succeeded", "failed"]
def __init__(self):
super().__init__(nullable=False)
[docs]
class FrozenField(AnyField):
"""A field that accepts any JSON-serializable value."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.add_check(self.check_freeze, 5)
[docs]
def check_freeze(self, value, **kwargs):
"""Validator stage for JSONField"""
try:
json.dumps(value) # Check if value is JSON-serializable
value = deepfreeze(value)
except (TypeError, ValueError) as e:
raise ValueError("Value is not freezable") from e
return value, False
[docs]
def convert_to_proxy(self, value, **kwargs):
return deepfreeze(value)
[docs]
def repr_type(self):
return "frozen"
[docs]
class BasicField(AnyField):
"""
Given ftype T, accept value if it is an instance of T or if T(value) succeeds.
Conversion to T is performed.
Subclasses include basic types: str, int, bool
"""
def __init__(self, ftype, **kwargs):
super().__init__(**kwargs)
self.ftype = ftype
self.add_check(self.to_ftype, 5)
[docs]
def to_ftype(self, value, **kwargs):
"""Validator stage for BasicField"""
if not isinstance(value, self.ftype):
value = self.ftype(value)
return value, False
[docs]
def repr_type(self):
return self.ftype.__name__
[docs]
class StrField(BasicField):
"""A string field validator"""
def __init__(self, **kwargs):
super().__init__(ftype=str, **kwargs)
[docs]
class NameField(StrField):
"""Name fields are non-nullable string fields whose value
must follow a pattern.
"""
def __init__(self, **kwargs):
super().__init__(nullable=False, minimum_len=2, maximum_len=100, **kwargs)
self.add_check(self.check_name, 7)
NAME_PATTERN = re.compile(r"[a-z0-9_-]+")
[docs]
def check_name(self, value: str, **kwargs):
if self.NAME_PATTERN.fullmatch(value) is None:
raise ValueError(
f"Name must be a string matching '{self.NAME_PATTERN.pattern}'"
)
return value, False
[docs]
class VocabNameField(NameField):
NAME_PATTERN = re.compile(r".+")
[docs]
class LicenseNameField(NameField):
NAME_PATTERN = re.compile(r"[a-z0-9_-]+")
[docs]
class TagNameField(NameField):
NAME_PATTERN = re.compile(r"[A-Za-z0-9 ._-]+")
[docs]
class IntField(BasicField):
"""An int field validator"""
def __init__(self, **kwargs):
super().__init__(ftype=int, **kwargs)
[docs]
class BoolField(BasicField):
"""A bool field validator"""
def __init__(self, **kwargs):
super().__init__(ftype=bool, **kwargs)
[docs]
class DateField(FieldValidator):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.add_check(self.to_date, 5)
[docs]
def to_date(self, value: Any, **kwargs) -> tuple[datetime, bool]:
"""Validation stage for dates."""
if isinstance(value, str):
return datetime.fromisoformat(value), False
elif isinstance(value, datetime):
return value, False
else:
raise ValueError("Invalid type, expected datetime or string")
[docs]
def convert_to_entity(self, value: datetime, **kwargs) -> str:
return value.isoformat()
[docs]
def convert_to_proxy(self, value: str, **kwargs) -> datetime:
return datetime.fromisoformat(value)
[docs]
def repr_type(self):
return "datetime"
[docs]
class DateFmtField(FieldValidator):
def __init__(self, datefmt, **kwargs):
super().__init__(**kwargs)
self.datefmt = datefmt
self.add_check(self.to_date, 5)
[docs]
def to_date(self, value: Any, **kwargs) -> tuple[datetime, bool]:
"""Validation stage for dates."""
if isinstance(value, str):
return datetime.strptime(value, self.datefmt), False
elif isinstance(value, datetime):
return value, False
else:
raise ValueError("Invalid type, expected datetime or string")
[docs]
def convert_to_entity(self, value: datetime, **kwargs) -> str:
return value.strftime(self.datefmt)
[docs]
def convert_to_proxy(self, value: str, **kwargs) -> datetime:
return datetime.strptime(value, self.datefmt)
[docs]
def repr_type(self):
return "datetime"
[docs]
class DictField(AnyField):
def __init__(self, key_type, value_type, **kwargs):
super().__init__(**kwargs)
self.key_type = key_type
self.value_type = value_type
self._repr_type = f"Dict[{key_type},{value_type}]"
self.add_check(self.check_dict, 5)
[docs]
def check_dict(self, value, **kwargs):
if not isinstance(value, Mapping):
raise TypeError("Expected a dictionary")
for k, v in value.items():
if not isinstance(k, self.key_type):
raise TypeError(f"Invalid key type, expected {self.key_type}")
if not isinstance(v, self.value_type):
raise TypeError(f"Invalid value type, expected {self.value_type}")
return dict(value), False
[docs]
class ListField(AnyField):
def __init__(self, element_type, **kwargs):
super().__init__(**kwargs)
self.element_type = element_type
self._repr_type = f"List[{element_type}]"
self.add_check(self.check_list, 5)
[docs]
def check_list(self, value, **kwargs):
if not isinstance(value, MutableSequence):
raise TypeError("Expected a list")
for v in value:
if not isinstance(v, self.element_type):
raise TypeError(f"Invalid element type, expected {self.element_type}")
return list(value), False
[docs]
def repr_type(self):
return self._repr_type
[docs]
class UUIDField(BasicField):
def __init__(self, **kwargs):
super().__init__(ftype=UUID, **kwargs)
[docs]
def convert_to_proxy(self, value: str, **kwargs) -> UUID:
try:
return UUID(value)
except ValueError:
raise ValueError("Invalid UUID", value, kwargs)
[docs]
def convert_to_entity(self, value: UUID, **kwargs) -> str:
return str(value)
[docs]
def repr_type(self):
return "UUID"