Source code for stelar.client.proxy.fieldvalidation

"""
Proxy field validation
----------------------

Proxied entity fields (attributes) have two representations: one
that appears in the entity (JSON object stored in a dict) and one that is used
by the proxy objects.

client proxy value  <-->  json entity value

For example, dates are represented as datetime objects in the proxy and as strings
in the JSON entity. Conversion between the two is done via the two conversion methods
of the classes herein.
"""

from __future__ import annotations

import json
import re
from collections.abc import MutableSequence
from datetime import datetime
from typing import Any, Mapping, Optional
from uuid import UUID

from frozendict import deepfreeze

__all__ = [
    "FieldValidator",
    "AnyField",
    "FrozenField",
    "EnumeratedField",
    "StateField",
    "BoolField",
    "IntField",
    "StrField",
    "DateField",
    "DateFmtField",
    "DictField",
    "ListField",
    "BasicField",
    "ExecStateField",
    "UUIDField",
    "NameField",
    "TagNameField",
    "VocabNameField",
]


[docs] class FieldValidator: """Provide simple validation and conversion for entity fields. A validation is a sequence of checks. Each check can: - Raise an exception of ValueError - Apply a value conversion and continue - Apply a value conversion and terminate Any function that takes as input a value, and returns a pair (newvalue, done) where `done` is a boolean, can be used as a check. At the end of all conversions, if no conversion signaled `done`, the `strict` attribute is checked. If True, an error is raised, else (the default) conversion succeeds. """ def __init__( self, *, strict: bool = False, nullable: bool = True, default: Any = ..., minimum_value: Any = None, maximum_value: Any = None, maximum_len: Optional[int] = None, minimum_len: Optional[int] = None, ): self.prioritized_checks = [] self.checks = [] self.strict = strict self.nullable = nullable if default is not ...: self.default = default self.minimum_value = minimum_value self.maximum_value = maximum_value self.maximum_len = maximum_len self.minimum_len = minimum_len if nullable is not None: self.add_check(self.check_null, -1) if minimum_value is not None: self.add_check(self.check_minimum, 10) if maximum_value is not None: self.add_check(self.check_maximum, 12) if maximum_len is not None or minimum_len is not None: self.add_check(self.check_length, 20)
[docs] def add_check(self, check_func, pri: int): self.prioritized_checks.append((check_func, pri)) self.prioritized_checks.sort(key=lambda p: p[1]) self.checks = [p[0] for p in self.prioritized_checks]
[docs] def check_null(self, value, **kwargs): if value is None: if self.nullable: return None, True else: raise ValueError("None is not allowed") else: return value, False
[docs] def check_length(self, value, **kwargs): value_len = len(value) if self.minimum_len is not None and value_len < self.minimum_len: raise ValueError( f"The length ({value_len}) is less than the minimum ({self.minimum_len})" ) if self.maximum_len is not None and value_len > self.maximum_len: raise ValueError( f"The length ({value_len}) is greater that the maximum ({self.maximum_len})" ) return value, False
[docs] def check_minimum(self, value, **kwargs): if value < self.minimum_value: raise ValueError(f"Value ({value}) too low (minimum={self.minimum_value})") return value, False
[docs] def check_maximum(self, value, **kwargs): if value > self.maximum_value: raise ValueError(f"Value ({value}) too high (maximum={self.maximum_value})") return value, False
[docs] def validate(self, value, **kwargs): done = False try: for check in self.checks: value, done = check(value, **kwargs) if done: return value except ValueError: raise except Exception as e: raise ValueError("Bad value found during validation") from e if self.strict: raise ValueError("Validation failed to match input value") else: return value
[docs] def convert_to_proxy(self, value, **kwargs): raise NotImplementedError()
[docs] def convert_to_entity(self, value, **kwargs): raise NotImplementedError()
[docs] def default_value(self): raise NotImplementedError
[docs] def repr_constraints(self): nn = ["nullable" if self.nullable else "not null"] if self.minimum_len is not None and self.maximum_len is not None: nn.append(f"{self.minimum_len} <= length <= {self.maximum_len}") elif self.minimum_len is not None: nn.append(f"{self.minimum_len} <= length") elif self.maximum_len is not None: nn.append(f"length <= {self.maximum_len}") if self.minimum_value is not None and self.maximum_value is not None: nn.append(f"{self.minimum_value} <= value <={self.maximum_value}") elif self.minimum_value is not None: nn.append(f"{self.minimum_value} <= value") elif self.maximum_value is not None: nn.append(f"value <= {self.maximum_value}") return nn
[docs] class AnyField(FieldValidator): """A very promiscuous basic validator.""" def __init__(self, repr_type="Any", *args, **kwargs): super().__init__(*args, **kwargs) self._repr_type = repr_type
[docs] def convert_to_proxy(self, value, **kwargs): return value
[docs] def convert_to_entity(self, value, **kwargs): return value
[docs] def default_value(self): if hasattr(self, "default"): return self.default elif self.nullable: return None else: raise NotImplementedError()
[docs] def repr_type(self): return self._repr_type
[docs] class JSONField(AnyField): """A field that accepts any JSON-serializable value.""" def __init__(self, **kwargs): super().__init__(**kwargs) self.add_check(self.check_json, 5)
[docs] def check_json(self, value, **kwargs): """Validator stage for JSONField""" try: json.dumps(value) # Check if value is JSON-serializable except (TypeError, ValueError) as e: raise ValueError("Value is not JSON-serializable") from e return value, False
[docs] def repr_type(self): return "JSON"
[docs] def convert_to_proxy(self, value, **kwargs): return value
[docs] def convert_to_entity(self, value, **kwargs): return value
[docs] class EnumeratedField(AnyField): """Fields with a fixed set of legal values. Subclasses can redefine the VALUES class attribute. """ VALUES = [] def __init__(self, *args, **kwargs): super().__init__(**kwargs) self.add_check(self.oneof, 5)
[docs] def oneof(self, value): if value not in self.VALUES: raise ValueError(f"Not one of {self.VALUES}") return value, False
[docs] def repr_type(self): return f"OneOf{self.VALUES}"
[docs] class StateField(EnumeratedField): VALUES = ["active", "deleted"] def __init__(self): super().__init__(nullable=False)
[docs] class ExecStateField(EnumeratedField): VALUES = ["running", "succeeded", "failed"] def __init__(self): super().__init__(nullable=False)
[docs] class FrozenField(AnyField): """A field that accepts any JSON-serializable value.""" def __init__(self, **kwargs): super().__init__(**kwargs) self.add_check(self.check_freeze, 5)
[docs] def check_freeze(self, value, **kwargs): """Validator stage for JSONField""" try: json.dumps(value) # Check if value is JSON-serializable value = deepfreeze(value) except (TypeError, ValueError) as e: raise ValueError("Value is not freezable") from e return value, False
[docs] def convert_to_proxy(self, value, **kwargs): return deepfreeze(value)
[docs] def repr_type(self): return "frozen"
[docs] class BasicField(AnyField): """ Given ftype T, accept value if it is an instance of T or if T(value) succeeds. Conversion to T is performed. Subclasses include basic types: str, int, bool """ def __init__(self, ftype, **kwargs): super().__init__(**kwargs) self.ftype = ftype self.add_check(self.to_ftype, 5)
[docs] def to_ftype(self, value, **kwargs): """Validator stage for BasicField""" if not isinstance(value, self.ftype): value = self.ftype(value) return value, False
[docs] def repr_type(self): return self.ftype.__name__
[docs] class StrField(BasicField): """A string field validator""" def __init__(self, **kwargs): super().__init__(ftype=str, **kwargs)
[docs] class NameField(StrField): """Name fields are non-nullable string fields whose value must follow a pattern. """ def __init__(self, **kwargs): super().__init__(nullable=False, minimum_len=2, maximum_len=100, **kwargs) self.add_check(self.check_name, 7) NAME_PATTERN = re.compile(r"[a-z0-9_-]+")
[docs] def check_name(self, value: str, **kwargs): if self.NAME_PATTERN.fullmatch(value) is None: raise ValueError( f"Name must be a string matching '{self.NAME_PATTERN.pattern}'" ) return value, False
[docs] class VocabNameField(NameField): NAME_PATTERN = re.compile(r".+")
[docs] class LicenseNameField(NameField): NAME_PATTERN = re.compile(r"[a-z0-9_-]+")
[docs] class TagNameField(NameField): NAME_PATTERN = re.compile(r"[A-Za-z0-9 ._-]+")
[docs] class IntField(BasicField): """An int field validator""" def __init__(self, **kwargs): super().__init__(ftype=int, **kwargs)
[docs] class BoolField(BasicField): """A bool field validator""" def __init__(self, **kwargs): super().__init__(ftype=bool, **kwargs)
[docs] class DateField(FieldValidator): def __init__(self, **kwargs): super().__init__(**kwargs) self.add_check(self.to_date, 5)
[docs] def to_date(self, value: Any, **kwargs) -> tuple[datetime, bool]: """Validation stage for dates.""" if isinstance(value, str): return datetime.fromisoformat(value), False elif isinstance(value, datetime): return value, False else: raise ValueError("Invalid type, expected datetime or string")
[docs] def convert_to_entity(self, value: datetime, **kwargs) -> str: return value.isoformat()
[docs] def convert_to_proxy(self, value: str, **kwargs) -> datetime: return datetime.fromisoformat(value)
[docs] def repr_type(self): return "datetime"
[docs] class DateFmtField(FieldValidator): def __init__(self, datefmt, **kwargs): super().__init__(**kwargs) self.datefmt = datefmt self.add_check(self.to_date, 5)
[docs] def to_date(self, value: Any, **kwargs) -> tuple[datetime, bool]: """Validation stage for dates.""" if isinstance(value, str): return datetime.strptime(value, self.datefmt), False elif isinstance(value, datetime): return value, False else: raise ValueError("Invalid type, expected datetime or string")
[docs] def convert_to_entity(self, value: datetime, **kwargs) -> str: return value.strftime(self.datefmt)
[docs] def convert_to_proxy(self, value: str, **kwargs) -> datetime: return datetime.strptime(value, self.datefmt)
[docs] def repr_type(self): return "datetime"
[docs] class DictField(AnyField): def __init__(self, key_type, value_type, **kwargs): super().__init__(**kwargs) self.key_type = key_type self.value_type = value_type self._repr_type = f"Dict[{key_type},{value_type}]" self.add_check(self.check_dict, 5)
[docs] def check_dict(self, value, **kwargs): if not isinstance(value, Mapping): raise TypeError("Expected a dictionary") for k, v in value.items(): if not isinstance(k, self.key_type): raise TypeError(f"Invalid key type, expected {self.key_type}") if not isinstance(v, self.value_type): raise TypeError(f"Invalid value type, expected {self.value_type}") return dict(value), False
[docs] class ListField(AnyField): def __init__(self, element_type, **kwargs): super().__init__(**kwargs) self.element_type = element_type self._repr_type = f"List[{element_type}]" self.add_check(self.check_list, 5)
[docs] def check_list(self, value, **kwargs): if not isinstance(value, MutableSequence): raise TypeError("Expected a list") for v in value: if not isinstance(v, self.element_type): raise TypeError(f"Invalid element type, expected {self.element_type}") return list(value), False
[docs] def repr_type(self): return self._repr_type
[docs] class UUIDField(BasicField): def __init__(self, **kwargs): super().__init__(ftype=UUID, **kwargs)
[docs] def convert_to_proxy(self, value: str, **kwargs) -> UUID: try: return UUID(value) except ValueError: raise ValueError("Invalid UUID", value, kwargs)
[docs] def convert_to_entity(self, value: UUID, **kwargs) -> str: return str(value)
[docs] def repr_type(self): return "UUID"