[PATCH 08 of 24 yams V2] [mypy] type constraints.py

Laurent Peuch cortex at worlddomination.be
Wed Mar 4 15:17:44 CET 2020


# HG changeset patch
# User Laurent Peuch <cortex at worlddomination.be>
# Date 1579692960 -3600
#      Wed Jan 22 12:36:00 2020 +0100
# Node ID bf3ca63b9f5c2f8e217ed4e65c119cc69a2402b4
# Parent  4c000e0d0baf31b6611e6869ab31c102e977772a
# Available At https://hg.logilab.org/users/lpeuch/yams
#              hg pull https://hg.logilab.org/users/lpeuch/yams -r bf3ca63b9f5c
# EXP-Topic type_annotations
[mypy] type constraints.py

diff --git a/yams/__init__.py b/yams/__init__.py
--- a/yams/__init__.py
+++ b/yams/__init__.py
@@ -28,7 +28,7 @@ from logilab.common.date import strptime
 from logilab.common import nullobject
 
 from yams._exceptions import SchemaError, UnknownType, BadSchemaDefinition, ValidationError  # noqa
-from yams.types import _RdefType
+from yams.types import _RdefRdefSchemaType
 
 __docformat__: str = "restructuredtext en"
 
@@ -91,7 +91,7 @@ KNOWN_METAATTRIBUTES: Set[str] = set(('f
 _type_of_default = TypeVar('_type_of_default')
 
 
-def convert_default_value(rdef: _RdefType,
+def convert_default_value(rdef: _RdefRdefSchemaType,
                           default: _type_of_default) -> Union[_type_of_default, datetime,
                                                               date, float, str, int, bool]:
     # rdef can be either a .schema.RelationDefinitionSchema
diff --git a/yams/constraints.py b/yams/constraints.py
--- a/yams/constraints.py
+++ b/yams/constraints.py
@@ -17,8 +17,6 @@
 # along with yams. If not, see <https://www.gnu.org/licenses/>.
 """Some common constraint classes."""
 
-__docformat__ = "restructuredtext en"
-
 import re
 import decimal
 import operator
@@ -26,62 +24,83 @@ import json
 import datetime
 import warnings
 
+from typing import Any, Dict, Match, Tuple, Type, Union, Optional, Callable, Sequence, List, cast
+
 from logilab.common.deprecation import class_renamed
 
 import yams
 from yams import BadSchemaDefinition
 from yams.interfaces import IConstraint, IVocabularyConstraint
+from yams.types import _jsonSerializableType, _RdefType, _SubjObjSchema, _RtypeType
 
-_ = str
+__docformat__: str = "restructuredtext en"
+
+_: Type[str] = str
 
 
 class ConstraintJSONEncoder(json.JSONEncoder):
-    def default(self, obj):
+    def default(self, obj: Union[Any, 'NOW', 'TODAY']) -> Union[Any, dict]:
         if isinstance(obj, Attribute):
             return {'__attribute__': obj.attr}
+
         if isinstance(obj, NOW):
-            d = obj.offset
-            if d is not None:
-                d = {'days': d.days,
-                     'seconds': d.seconds,
-                     'microseconds': d.microseconds}
+            # it is not a timedelta
+            if obj.offset is None:
+                return {'__now__': True, 'offset': obj.offset}
+
+            d = {'days': obj.offset.days,
+                 'seconds': obj.offset.seconds,
+                 'microseconds': obj.offset.microseconds}
+
             return {'__now__': True, 'offset': d}
+
         if isinstance(obj, TODAY):
-            d = obj.offset
-            if d is not None:
-                d = {'days': d.days,
-                     'seconds': d.seconds,
-                     'microseconds': d.microseconds}
+            # it is not a timedelta
+            if obj.offset is None:
+                return {'__today__': True, 'offset': obj.offset, 'type': obj.type}
+
+            d = {'days': obj.offset.days,
+                 'seconds': obj.offset.seconds,
+                 'microseconds': obj.offset.microseconds}
+
             return {'__today__': True, 'offset': d, 'type': obj.type}
+
         return super(ConstraintJSONEncoder, self).default(obj)
 
 
-def _json_object_hook(dct):
+def _json_object_hook(dct: Dict) -> Union[Dict, 'NOW', 'TODAY', 'Attribute']:
+    offset: Optional[datetime.timedelta]
+
     if '__attribute__' in dct:
         return Attribute(dct['__attribute__'])
+
     if '__now__' in dct:
         if dct['offset'] is not None:
             offset = datetime.timedelta(**dct['offset'])
         else:
             offset = None
+
         return NOW(offset)
+
     if '__today__' in dct:
         if dct['offset'] is not None:
             offset = datetime.timedelta(**dct['offset'])
         else:
             offset = None
+
         return TODAY(offset=offset, type=dct['type'])
+
     return dct
 
 
-def cstr_json_dumps(obj):
+def cstr_json_dumps(obj: _jsonSerializableType) -> str:
     return str(ConstraintJSONEncoder(sort_keys=True).encode(obj))
 
 
-cstr_json_loads = json.JSONDecoder(object_hook=_json_object_hook).decode
+cstr_json_loads: Callable[[str], Dict] = json.JSONDecoder(object_hook=_json_object_hook).decode
 
 
-def _message_value(boundary):
+def _message_value(boundary) -> Any:
     if isinstance(boundary, Attribute):
         return boundary.attr
     return boundary
@@ -91,57 +110,61 @@ class BaseConstraint(object):
     """base class for constraints"""
     __implements__ = IConstraint
 
-    def __init__(self, msg=None):
+    def __init__(self, msg=None) -> None:
         self.msg = msg
 
-    def check_consistency(self, subjschema, objschema, rdef):
+    def check_consistency(self, subjschema, objschema, rdef: _RdefType) -> None:
         pass
 
-    def type(self):
+    def type(self) -> str:
         return self.__class__.__name__
 
-    def serialize(self):
+    def serialize(self) -> str:
         """called to make persistent valuable data of a constraint"""
         return cstr_json_dumps({u'msg': self.msg})
 
     @classmethod
-    def deserialize(cls, value):
+    def deserialize(cls: Type['BaseConstraint'], value: str) -> Any:
         """called to restore serialized data of a constraint. Should return
         a `cls` instance
         """
         value = value.strip()
+
         if value and value != 'None':
             d = cstr_json_loads(value)
         else:
             d = {}
+
         return cls(**d)
 
-    def failed_message(self, key, value, entity=None):
+    def failed_message(self, key: str, value, entity=None) -> Tuple[Any, Dict[str, Any]]:
         if entity is None:
             warnings.warn('[yams 0.44] failed message '
                           'should now be given entity has argument.',
                           DeprecationWarning, stacklevel=2)
+
         if self.msg:
             return self.msg, {}
+
         return self._failed_message(entity, key, value)
 
-    def _failed_message(self, entity, key, value):
+    def _failed_message(self, entity, key: str, value) -> Tuple[str, Dict[str, Any]]:
         return _('%(KEY-cstr)s constraint failed for value %(KEY-value)r'), {
             key + '-cstr': self,
             key + '-value': value}
 
-    def __eq__(self, other):
+    def __eq__(self, other) -> bool:
         return (
             (self.type(), self.serialize()) == (other.type(), other.serialize())
         )
 
-    def __ne__(self, other):
+    def __ne__(self, other) -> bool:
         return not self == other
 
-    def __lt__(self, other):
+    def __lt__(self, other) -> bool:
         return NotImplemented
 
-    def __hash__(self):
+    def __hash__(self) -> int:
         return hash((self.type(), self.serialize()))
 
 
@@ -150,15 +173,19 @@ class BaseConstraint(object):
 class UniqueConstraint(BaseConstraint):
     """object of relation must be unique"""
 
-    def __str__(self):
+    def __str__(self) -> str:
         return 'unique'
 
-    def check_consistency(self, subjschema, objschema, rdef):
+    def check_consistency(self,
+                          subjschema: _SubjObjSchema,
+                          objschema: _SubjObjSchema,
+                          rdef: _RdefType) -> None:
+
         if not objschema.final:
             raise BadSchemaDefinition("unique constraint doesn't apply to non "
                                       "final entity type")
 
-    def check(self, entity, rtype, values):
+    def check(self, entity, rtype: _RtypeType, values) -> bool:
         """return true if the value satisfy the constraint, else false"""
         return True
 
@@ -170,31 +197,44 @@ class SizeConstraint(BaseConstraint):
     if min is not None the string length must not be shorter than min
     """
 
-    def __init__(self, max=None, min=None, msg=None):
+    def __init__(self, max: int = None, min: int = None, msg=None) -> None:
         super(SizeConstraint, self).__init__(msg)
+
         assert (max is not None or min is not None), "No max or min"
+
         if min is not None:
             assert isinstance(min, int), 'min must be an int, not %r' % min
+
         if max is not None:
             assert isinstance(max, int), 'max must be an int, not %r' % max
+
         self.max = max
         self.min = min
 
-    def __str__(self):
+    def __str__(self) -> str:
         res = 'size'
+
         if self.max is not None:
             res = '%s <= %s' % (res, self.max)
+
         if self.min is not None:
             res = '%s <= %s' % (self.min, res)
+
         return res
 
-    def check_consistency(self, subjschema, objschema, rdef):
+    def check_consistency(self,
+                          subjschema: _SubjObjSchema,
+                          objschema: _SubjObjSchema,
+                          rdef: _RdefType) -> None:
+
         if not objschema.final:
             raise BadSchemaDefinition("size constraint doesn't apply to non "
                                       "final entity type")
+
         if objschema not in ('String', 'Bytes', 'Password'):
             raise BadSchemaDefinition("size constraint doesn't apply to %s "
                                       "entity type" % objschema)
+
         if self.max:
             for cstr in rdef.constraints:
                 if cstr.__class__ is StaticVocabularyConstraint:
@@ -204,46 +244,55 @@ class SizeConstraint(BaseConstraint):
                                 'size constraint set to %s but vocabulary '
                                 'contains string of greater size' % self.max)
 
-    def check(self, entity, rtype, value):
+    def check(self, entity, rtype: _RtypeType, value: Sequence) -> bool:
         """return true if the value is in the interval specified by
         self.min and self.max
         """
         if self.max is not None and len(value) > self.max:
             return False
+
         if self.min is not None and len(value) < self.min:
             return False
+
         return True
 
-    def _failed_message(self, entity, key, value):
+    def _failed_message(self, entity, key: str, value: Sequence) -> Tuple[str, Dict[str, Any]]:
         if self.max is not None and len(value) > self.max:
             return _('value should have maximum size of %(KEY-max)s'
                      ' but found %(KEY-size)s'), {
                 key + '-max': self.max,
                 key + '-size': len(value)}
+
         if self.min is not None and len(value) < self.min:
             return _('value should have minimum size of %(KEY-min)s'
                      ' but found %(KEY-size)s'), {
                 key + '-min': self.min,
                 key + '-size': len(value)}
+
         assert False, 'shouldnt be there'
 
-    def serialize(self):
+    def serialize(self) -> str:
         """simple text serialization"""
         return cstr_json_dumps({u'min': self.min, u'max': self.max,
                                 u'msg': self.msg})
 
     @classmethod
-    def deserialize(cls, value):
+    def deserialize(cls: Type['SizeConstraint'], value: str) -> 'SizeConstraint':
         """simple text deserialization"""
         try:
             d = cstr_json_loads(value)
+
             return cls(**d)
         except ValueError:
             kwargs = {}
+
             for adef in value.split(','):
                 key, val = [w.strip() for w in adef.split('=')]
+
                 assert key in ('min', 'max')
+
                 kwargs[str(key)] = int(val)
+
             return cls(**kwargs)
 
 
@@ -251,7 +300,7 @@ class RegexpConstraint(BaseConstraint):
     """specifies a set of allowed patterns for a string value"""
     __implements__ = IConstraint
 
-    def __init__(self, regexp, flags=0, msg=None):
+    def __init__(self, regexp: str, flags: int = 0, msg=None) -> None:
         """
         Construct a new RegexpConstraint.
 
@@ -264,34 +313,36 @@ class RegexpConstraint(BaseConstraint):
         self.flags = flags
         self._rgx = re.compile(regexp, flags)
 
-    def __str__(self):
+    def __str__(self) -> str:
         return 'regexp %s' % self.regexp
 
-    def check_consistency(self, subjschema, objschema, rdef):
+    def check_consistency(self, subjschema: _SubjObjSchema, objschema: _SubjObjSchema,
+                          rdef: _RdefType) -> None:
         if not objschema.final:
             raise BadSchemaDefinition("regexp constraint doesn't apply to non "
                                       "final entity type")
+
         if objschema not in ('String', 'Password'):
             raise BadSchemaDefinition("regexp constraint doesn't apply to %s "
                                       "entity type" % objschema)
 
-    def check(self, entity, rtype, value):
+    def check(self, entity, rtype: _RtypeType, value: str) -> Optional[Match[str]]:
         """return true if the value maches the regular expression"""
         return self._rgx.match(value, self.flags)
 
-    def _failed_message(self, entity, key, value):
+    def _failed_message(self, entity, key: str, value) -> Tuple[str, Dict[str, Any]]:
         return _("%(KEY-value)r doesn't match "
                  "the %(KEY-regexp)r regular expression"), {
             key + '-value': value,
             key + '-regexp': self.regexp}
 
-    def serialize(self):
+    def serialize(self) -> str:
         """simple text serialization"""
         return cstr_json_dumps({u'regexp': self.regexp, u'flags': self.flags,
                                 u'msg': self.msg})
 
     @classmethod
-    def deserialize(cls, value):
+    def deserialize(cls, value: str) -> 'RegexpConstraint':
         """simple text deserialization"""
         try:
             d = cstr_json_loads(value)
@@ -300,11 +351,11 @@ class RegexpConstraint(BaseConstraint):
             regexp, flags = value.rsplit(',', 1)
             return cls(regexp, int(flags))
 
-    def __deepcopy__(self, memo):
+    def __deepcopy__(self, memo) -> 'RegexpConstraint':
         return RegexpConstraint(self.regexp, self.flags)
 
 
-OPERATORS = {
+OPERATORS: Dict[str, Callable[[Any, Any], bool]] = {
     '<=': operator.le,
     '<': operator.lt,
     '>': operator.gt,
@@ -319,47 +370,54 @@ class BoundaryConstraint(BaseConstraint)
     """
     __implements__ = IConstraint
 
-    def __init__(self, op, boundary=None, msg=None):
+    def __init__(self, op: str, boundary=None, msg=None) -> None:
         super(BoundaryConstraint, self).__init__(msg)
+
         assert op in OPERATORS, op
-        self.operator = op
+
+        self.operator: str = op
         self.boundary = boundary
 
-    def __str__(self):
+    def __str__(self) -> str:
         return 'value %s' % self.serialize()
 
-    def check_consistency(self, subjschema, objschema, rdef):
+    def check_consistency(self, subjschema: _SubjObjSchema, objschema: _SubjObjSchema,
+                          rdef: _RdefType) -> None:
         if not objschema.final:
             raise BadSchemaDefinition("bound constraint doesn't apply to non "
                                       "final entity type")
 
-    def check(self, entity, rtype, value):
+    def check(self, entity, rtype: _RtypeType, value) -> bool:
         """return true if the value satisfies the constraint, else false"""
         boundary = actual_value(self.boundary, entity)
+
         if boundary is None:
             return True
+
         return OPERATORS[self.operator](value, boundary)
 
-    def _failed_message(self, entity, key, value):
+    def _failed_message(self, entity, key: str, value) -> Tuple[str, Dict[str, Any]]:
         return "value %%(KEY-value)s must be %s %%(KEY-boundary)s" % self.operator, {
             key + '-value': value,
             key + '-boundary': _message_value(actual_value(self.boundary,
                                                            entity))}
 
-    def serialize(self):
+    def serialize(self) -> str:
         """simple text serialization"""
         return cstr_json_dumps({u'op': self.operator,
                                 u'boundary': self.boundary,
                                 u'msg': self.msg})
 
     @classmethod
-    def deserialize(cls, value):
+    def deserialize(cls: Type['BoundaryConstraint'], value: str) -> 'BoundaryConstraint':
         """simple text deserialization"""
         try:
             d = cstr_json_loads(value)
+
             return cls(**d)
         except ValueError:
             op, boundary = value.split(' ', 1)
+
             return cls(op, eval(boundary))
 
 
@@ -380,34 +438,45 @@ class IntervalBoundConstraint(BaseConstr
     """
     __implements__ = IConstraint
 
-    def __init__(self, minvalue=None, maxvalue=None, msg=None):
+    def __init__(self,
+                 minvalue: Optional[Union[int, float]] = None,
+                 maxvalue: Optional[Union[int, float]] = None,
+                 msg=None) -> None:
         """
         :param minvalue: the minimal value that can be used
         :param maxvalue: the maxvalue value that can be used
         """
         assert not (minvalue is None and maxvalue is None)
+
         super(IntervalBoundConstraint, self).__init__(msg)
+
         self.minvalue = minvalue
         self.maxvalue = maxvalue
 
-    def __str__(self):
+    def __str__(self) -> str:
         return 'value [%s]' % self.serialize()
 
-    def check_consistency(self, subjschema, objschema, rdef):
+    def check_consistency(self, subjschema: _SubjObjSchema, objschema: _SubjObjSchema,
+                          rdef: _RdefType) -> None:
+
         if not objschema.final:
             raise BadSchemaDefinition("interval bound constraint doesn't apply"
                                       " to non final entity type")
 
-    def check(self, entity, rtype, value):
+    def check(self, entity, rtype: _RtypeType, value: Union[int, float]) -> bool:
         minvalue = actual_value(self.minvalue, entity)
+
         if minvalue is not None and value < minvalue:
             return False
+
         maxvalue = actual_value(self.maxvalue, entity)
+
         if maxvalue is not None and value > maxvalue:
             return False
+
         return True
 
-    def _failed_message(self, entity, key, value):
+    def _failed_message(self, entity, key: str, value) -> Tuple[str, Dict[str, Any]]:
         if (
             self.minvalue is not None
             and value < actual_value(self.minvalue, entity)
@@ -415,6 +484,7 @@ class IntervalBoundConstraint(BaseConstr
             return _("value %(KEY-value)s must be >= %(KEY-boundary)s"), {
                 key + '-value': value,
                 key + '-boundary': _message_value(self.minvalue)}
+
         if (
             self.maxvalue is not None
             and value > actual_value(self.maxvalue, entity)
@@ -422,22 +492,25 @@ class IntervalBoundConstraint(BaseConstr
             return _("value %(KEY-value)s must be <= %(KEY-boundary)s"), {
                 key + '-value': value,
                 key + '-boundary': _message_value(self.maxvalue)}
+
         assert False, 'shouldnt be there'
 
-    def serialize(self):
+    def serialize(self) -> str:
         """simple text serialization"""
         return cstr_json_dumps({u'minvalue': self.minvalue,
                                 u'maxvalue': self.maxvalue,
                                 u'msg': self.msg})
 
     @classmethod
-    def deserialize(cls, value):
+    def deserialize(cls: Type['IntervalBoundConstraint'], value: str) -> 'IntervalBoundConstraint':
         """simple text deserialization"""
         try:
             d = cstr_json_loads(value)
+
             return cls(**d)
         except ValueError:
             minvalue, maxvalue = value.split(';')
+
             return cls(eval(minvalue), eval(maxvalue))
 
 
@@ -445,19 +518,19 @@ class StaticVocabularyConstraint(BaseCon
     """Enforces a predefined vocabulary set for the value."""
     __implements__ = IVocabularyConstraint
 
-    def __init__(self, values, msg=None):
+    def __init__(self, values, msg=None) -> None:
         super(StaticVocabularyConstraint, self).__init__(msg)
         self.values = tuple(values)
 
-    def __str__(self):
+    def __str__(self) -> str:
         return 'value in (%s)' % u', '.join(
             repr(str(word)) for word in self.vocabulary())
 
-    def check(self, entity, rtype, value):
+    def check(self, entity, rtype, value) -> bool:
         """return true if the value is in the specific vocabulary"""
         return value in self.vocabulary(entity=entity)
 
-    def _failed_message(self, entity, key, value):
+    def _failed_message(self, entity, key: str, value) -> Tuple[str, Dict[str, Any]]:
         if isinstance(value, str):
             value = u'"%s"' % str(value)
             choices = u', '.join('"%s"' % val for val in self.values)
@@ -468,167 +541,187 @@ class StaticVocabularyConstraint(BaseCon
             key + '-value': value,
             key + '-choices': choices}
 
-    def vocabulary(self, **kwargs):
+    def vocabulary(self, **kwargs) -> tuple:
         """return a list of possible values for the attribute"""
         return self.values
 
-    def serialize(self):
+    def serialize(self) -> str:
         """serialize possible values as a json object"""
         return cstr_json_dumps({u'values': self.values, u'msg': self.msg})
 
     @classmethod
-    def deserialize(cls, value):
+    def deserialize(cls: Type['StaticVocabularyConstraint'],
+                    value: str) -> 'StaticVocabularyConstraint':
         """deserialize possible values from a csv list of evaluable strings"""
         try:
             values = cstr_json_loads(value)
+
             return cls(**values)
         except ValueError:
-            values = [eval(w) for w in re.split('(?<!,), ', value)]
-            if values and isinstance(values[0], str):
-                values = [v.replace(',,', ',') for v in values]
-            return cls(values)
+            interpreted_values = [eval(w) for w in re.split('(?<!,), ', value)]
+
+            if interpreted_values and isinstance(interpreted_values[0], str):
+                cast(List[str], interpreted_values)
+
+                interpreted_values = [v.replace(',,', ',') for v in interpreted_values]
+
+            return cls(interpreted_values)
 
 
 class FormatConstraint(StaticVocabularyConstraint):
 
-    regular_formats = (_('text/rest'),
-                       _('text/markdown'),
-                       _('text/html'),
-                       _('text/plain'),
-                       )
+    regular_formats: Tuple[str, ...] = (_('text/rest'),
+                                        _('text/markdown'),
+                                        _('text/html'),
+                                        _('text/plain'),
+                                        )
 
-    def __init__(self, msg=None, **kwargs):
+    # **kwargs to have a common interface between all Constraint initializers
+    def __init__(self, msg=None, **kwargs) -> None:
         values = self.regular_formats
         super(FormatConstraint, self).__init__(values, msg=msg)
 
-    def check_consistency(self, subjschema, objschema, rdef):
+    def check_consistency(self, subjschema: _SubjObjSchema, objschema: _SubjObjSchema,
+                          rdef: _RdefType) -> None:
+
         if not objschema.final:
             raise BadSchemaDefinition("format constraint doesn't apply to non "
                                       "final entity type")
+
         if not objschema == 'String':
             raise BadSchemaDefinition("format constraint only apply to String")
 
 
-FORMAT_CONSTRAINT = FormatConstraint()
+FORMAT_CONSTRAINT: FormatConstraint = FormatConstraint()
 
 
 class MultipleStaticVocabularyConstraint(StaticVocabularyConstraint):
     """Enforce a list of values to be in a predefined set vocabulary."""
     # XXX never used
 
-    def check(self, entity, rtype, values):
+    def check(self, entity, rtype: _RtypeType, values: Sequence) -> bool:
         """return true if the values satisfy the constraint, else false"""
         vocab = self.vocabulary(entity=entity)
+
         for value in values:
             if value not in vocab:
                 return False
+
         return True
 
 
 # special classes to be used w/ constraints accepting values as argument(s):
 # IntervalBoundConstraint
 
-def actual_value(value, entity):
+def actual_value(value, entity) -> Any:
     if hasattr(value, 'value'):
         return value.value(entity)
+
     return value
 
 
 class Attribute(object):
-    def __init__(self, attr):
+    def __init__(self, attr) -> None:
         self.attr = attr
 
-    def __str__(self):
+    def __str__(self) -> str:
         return '%s(%r)' % (self.__class__.__name__, self.attr)
 
-    def value(self, entity):
+    def value(self, entity) -> Any:
         return getattr(entity, self.attr)
 
 
 class NOW(object):
-    def __init__(self, offset=None):
+    def __init__(self, offset: Optional[datetime.timedelta] = None) -> None:
         self.offset = offset
 
-    def __str__(self):
+    def __str__(self) -> str:
         return '%s(%r)' % (self.__class__.__name__, self.offset)
 
-    def value(self, entity):
+    def value(self, entity) -> datetime.date:
         now = yams.KEYWORD_MAP['Datetime']['NOW']()
+
         if self.offset:
             now += self.offset
+
         return now
 
 
 class TODAY(object):
-    def __init__(self, offset=None, type='Date'):
+    def __init__(self, offset: Optional[datetime.timedelta] = None, type: str = 'Date') -> None:
         self.offset = offset
         self.type = type
 
     def __str__(self):
         return '%s(%r, %r)' % (self.__class__.__name__, self.offset, self.type)
 
-    def value(self, entity):
+    def value(self, entity) -> datetime.date:
         now = yams.KEYWORD_MAP[self.type]['TODAY']()
+
         if self.offset:
             now += self.offset
+
         return now
 
 
 # base types checking functions ###############################################
 
-def check_string(eschema, value):
+def check_string(eschema, value) -> bool:
     """check value is an unicode string"""
     return isinstance(value, str)
 
 
-def check_password(eschema, value):
+def check_password(eschema, value) -> bool:
     """check value is an encoded string"""
     return isinstance(value, bytes)
 
 
-def check_int(eschema, value):
+def check_int(eschema, value) -> bool:
     """check value is an integer"""
     try:
         int(value)
     except ValueError:
         return False
+
     return True
 
 
-def check_float(eschema, value):
+def check_float(eschema, value) -> bool:
     """check value is a float"""
     try:
         float(value)
     except ValueError:
         return False
+
     return True
 
 
-def check_decimal(eschema, value):
+def check_decimal(eschema, value) -> bool:
     """check value is a Decimal"""
     try:
         decimal.Decimal(value)
     except (TypeError, decimal.InvalidOperation):
         return False
+
     return True
 
 
-def check_boolean(eschema, value):
+def check_boolean(eschema, value) -> bool:
     """check value is a boolean"""
     return isinstance(value, int)
 
 
-def check_file(eschema, value):
+def check_file(eschema, value) -> bool:
     """check value has a getvalue() method (e.g. StringIO or cStringIO)"""
     return hasattr(value, 'getvalue')
 
 
-def yes(*args, **kwargs):
+def yes(*args, **kwargs) -> bool:
     """dunno how to check"""
     return True
 
 
-BASE_CHECKERS = {
+BASE_CHECKERS: Dict[str, Callable[[Any, Any], bool]] = {
     'Date': yes,
     'Time': yes,
     'Datetime': yes,
@@ -645,7 +738,8 @@ BASE_CHECKERS = {
     'Bytes': check_file,
 }
 
-BASE_CONVERTERS = {
+BASE_CONVERTERS: Dict[str, Callable[[Any],
+                                    Union[str, bytes, int, float, bool, decimal.Decimal]]] = {
     'String': str,
     'Password': bytes,
     'Int': int,
@@ -656,27 +750,31 @@ BASE_CONVERTERS = {
 }
 
 
-def patch_sqlite_decimal():
+def patch_sqlite_decimal() -> None:
     """patch Decimal checker and converter to bypass SQLITE Bug
     (SUM of Decimal return float in SQLITE)"""
 
-    def convert_decimal(value):
+    def convert_decimal(value) -> decimal.Decimal:
         # XXX issue a warning
         if isinstance(value, float):
             value = str(value)
+
         return decimal.Decimal(value)
 
-    def check_decimal(eschema, value):
+    def check_decimal(eschema, value) -> bool:
         """check value is a Decimal"""
         try:
             if isinstance(value, float):
                 return True
+
             decimal.Decimal(value)
         except (TypeError, decimal.InvalidOperation):
             return False
+
         return True
 
     global BASE_CONVERTERS
     BASE_CONVERTERS['Decimal'] = convert_decimal
+
     global BASE_CHECKERS
     BASE_CHECKERS["Decimal"] = check_decimal
diff --git a/yams/types.py b/yams/types.py
--- a/yams/types.py
+++ b/yams/types.py
@@ -21,13 +21,22 @@
 from typing import Union, TYPE_CHECKING, TypeVar
 
 
+_jsonSerializableType = TypeVar("_jsonSerializableType")
+
+
 # to avoid circular imports
 if TYPE_CHECKING:
     from yams import schema, buildobjs
 
-    _RdefType = Union[schema.RelationDefinitionSchema, buildobjs.RelationDefinition]
+    _RdefRdefSchemaType = Union[schema.RelationDefinitionSchema, buildobjs.RelationDefinition]
+    _RdefType = buildobjs.RelationDefinition
     _SchemaType = schema.Schema
+    _SubjObjSchema = schema.EntitySchema
+    _RtypeType = schema.RelationSchema
 
 else:
+    _RdefRdefSchemaType = TypeVar("rdef_rdefschema")
     _RdefType = TypeVar("rdef")
     _SchemaType = TypeVar("schema")
+    _SubjObjSchema = TypeVar("subjobjschema")
+    _RtypeType = TypeVar("rtype")



More information about the cubicweb-devel mailing list