-
-
Save amcgregor/415813e9d9dd8d374be6e5b952b20435 to your computer and use it in GitHub Desktop.
An absolute mess of work-in-progress prototyping to try to figure out just how the hojek the `typing` module actually works.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Python 3 function annotation typecasting support.""" | |
import collections | |
import typing | |
import inspect | |
from enum import Enum | |
from collections import abc | |
from re import compile as regex | |
from io import StringIO, TextIOBase, RawIOBase, IOBase, BufferedIOBase | |
from inspect import BoundArguments, Parameter, isabstract, isclass, ismethod, signature | |
from typeguard import check_argument_types | |
""" | |
https://docs.python.org/3/library/typing.html | |
https://docs.python.org/3/library/collections.abc.html#collections.abc.Collection | |
https://docs.python.org/3/library/abc.html#module-abc | |
https://github.com/agronholm/typeguard/blob/master/typeguard/__init__.py#L542 | |
https://pypi.org/project/typing-tools/ | |
https://pypi.org/project/enforce-typing/ | |
clear; ipython --no-banner -i typingtest.py | |
""" | |
from types import CodeType, FunctionType | |
from typing import ( | |
AbstractSet, | |
Any, | |
AsyncIterable, | |
AsyncIterator, | |
BinaryIO, | |
Callable, | |
Dict, | |
Generator, | |
IO, | |
Iterable, | |
Iterator, | |
List, | |
Optional, | |
Sequence, | |
Set, | |
TextIO, | |
Tuple, | |
Type, | |
TypeVar, | |
Union, | |
get_type_hints, | |
overload, | |
_GenericAlias, | |
) | |
from numbers import Number | |
try: | |
from typing import Literal, TypedDict | |
except ImportError: | |
Literal = TypedDict = None | |
try: | |
from typing import Literal, TypedDict | |
except ImportError: | |
Literal = TypedDict = None | |
try: | |
from typing import ForwardRef | |
evaluate_forwardref = ForwardRef._evaluate | |
except ImportError: | |
from typing import _ForwardRef as ForwardRef # Python < 3.8 | |
evaluate_forwardref = ForwardRef._eval_type | |
noop = object() | |
# https://github.com/agronholm/typeguard/blob/master/typeguard/__init__.py#L542 | |
aliases = { | |
abc.Container: noop, | |
abc.Hashable: noop, | |
abc.Iterable: noop, | |
abc.Iterator: iter, | |
abc.Mapping: dict, | |
abc.MutableMapping: dict, | |
abc.Reversible: noop, | |
abc.Sequence: noop, | |
abc.Set: set, | |
abc.Sized: noop, | |
typing.Any: noop, | |
typing.AnyStr: noop, | |
typing.Hashable: noop, | |
typing.Reversible: noop, | |
typing.Sized: noop, | |
typing.IO: StringIO, | |
typing.SupportsAbs: float, | |
typing.SupportsBytes: bytes, | |
typing.SupportsFloat: float, | |
typing.SupportsInt: int, | |
typing.SupportsRound: float, | |
typing.Pattern: regex, | |
typing.AbstractSet: set, | |
typing.OrderedDict: dict, | |
typing.FrozenSet: frozenset, | |
typing.Iterable: noop, | |
typing.List: list, | |
typing.Sequence: noop, | |
typing.Mapping: dict, | |
typing.MutableMapping: dict, | |
typing.MutableSequence: list, | |
typing.MutableSet: set, | |
typing.Set: set, | |
Number: (int, float), | |
} | |
## Container Annotation Support | |
_concrete_map: typing.Mapping[type, type] = { | |
typing.Set: set, | |
typing.List: list, | |
typing.Dict: dict, | |
Number: (int, float), | |
} | |
# This causes the internal machinery to treat these hypothetical container types as a different type of container. | |
_container_map: typing.Mapping[type, type] = { | |
# Sets | |
typing.AbstractSet: typing.Set, | |
typing.FrozenSet: typing.Set, | |
typing.MutableSet: typing.Set, | |
abc.Set: typing.Set, | |
abc.MutableSet: typing.Set, | |
frozenset: typing.Set, | |
# Lists | |
typing.Collection: typing.List, | |
typing.Iterable: typing.List, | |
typing.Iterator: typing.List, | |
typing.Sequence: typing.List, | |
typing.MutableSequence: typing.List, | |
abc.Collection: typing.List, | |
abc.Iterable: typing.List, | |
abc.Iterator: typing.List, | |
abc.Sequence: typing.List, | |
abc.MutableSequence: typing.List, | |
# Dictionaries | |
typing.Mapping: typing.Dict, | |
typing.MutableMapping: typing.Dict, | |
typing.OrderedDict: typing.Dict, | |
abc.Mapping: typing.Dict, | |
abc.MutableMapping: typing.Dict, | |
collections.OrderedDict: typing.Dict, | |
} | |
_origins = { | |
dict: ..., | |
typing.Dict: ..., | |
list: ..., | |
typing.List: ..., | |
typing.Sequence: ..., | |
abc.Sequence: ..., | |
set: ..., | |
typing.Set: ..., | |
tuple: ..., | |
typing.Tuple: ..., | |
typing.Union: ..., | |
typing.Type: ... | |
} | |
## Generalized Base Annotation Support | |
def cast_typevar(annotation:TypeVar, value:Any) -> None: | |
assert check_argument_types() | |
# Now we attempt exhaustive search of candidate types. | |
for T in annotation.__constraints__: | |
assert isclass(T) # Ensure the constraints are described using classes. | |
T = _container_map.get(getattr(T, 'origin', T), T) # Resolve any type annotation abstract types to concrete types. | |
T = _concrete_map.get(T, T) # Resolve any type annotation abstract types to concrete types. | |
if isinstance(value, T): return value # If we already satisfy the requirement, we're done. | |
if isabstract(T): continue # If the ultimate type is abstract, there's not much we can do with it. | |
try: # Otherwise, we'll attempt to cast by instantiating the class, passing the value. | |
value = T(value) | |
return value | |
except (TypeError, ValueError): | |
pass # This particular casting attempt failed, maybe the next will succeed? | |
raise TypeError(f"Unable to cast {value!r} to a type satisfying: {annotation!r}") | |
def cast_generic_alias(annotation:_GenericAlias, value:Any) -> None: | |
assert check_argument_types() | |
# First we try the type the alias was constructed from, if it was constructed from such a template. | |
try: | |
if not isinstance(value, annotation.__origin__): | |
value = annotation.__origin__(value) | |
return value | |
except (TypeError, ValueError): | |
pass | |
# Now we attempt exhaustive search of the candidate types. | |
for T in annotation.__args__: | |
if not isclass(T): continue | |
if isabstract(T): continue | |
try: | |
value = T(value) | |
return value | |
except (TypeError, ValueError): | |
pass # This particular casting attempt failed, maybe the next will succeed? | |
raise TypeError(f"Unable to cast {value!r} to a type satisfying: {annotation!r}") | |
## Container Type-Specific Casting | |
def cast_dict(annotation:TypeVar, value:abc.MutableMapping) -> dict: | |
assert check_argument_types() | |
result = {} | |
return result | |
def cast_sequence(annotation:TypeVar, value:Sequence) -> List: | |
assert check_argument_types() | |
result = [] | |
return result | |
## Internal Casting Machinery | |
def _cast(annotation, value): | |
if isinstance(annotation, tuple): # We've been mapped from an abstract type to one or more concrete types. | |
for T in annotation: # Attempt coersion to each concrete type until one works. | |
try: | |
return _cast(T, value) | |
break | |
except (TypeError, ValueError): continue | |
else: raise TypeError(f"Unable to cast {value} to any of: {', '.join(repr(T) for T in annotation)}") | |
if isinstance(annotation, _GenericAlias): | |
return cast_generic_alias(annotation, value) | |
if isinstance(annotation, TypeVar): | |
return cast_typevar(annotation, value) | |
if isclass(annotation): | |
return annotation(value) | |
raise TypeError(f"Unable to satisfy {annotation} through automated typecasting.") | |
## Public Interface | |
def cast_arguments(bound:BoundArguments): | |
sig = bound.signature.parameters | |
for k, v in bound.arguments.items(): | |
# if k == 'tags': __import__('pudb').set_trace() | |
annotation = sig[k].annotation | |
annotation = _concrete_map.get(annotation, annotation) | |
bound.arguments[k] = _cast(annotation, v) | |
def example(name:typing.AnyStr, age:Number, *, tags:typing.Iterable[str]): | |
__import__('pudb').set_trace() | |
assert check_argument_types() | |
return (name, age, tags) | |
def example1(name:str, age:int, tags:typing.MutableSet): | |
return (name, age, tags) | |
def example2(name:str, age:int, tags:typing.MutableSet[str]): | |
return (name, age, tags) | |
def example3(name:str, ages:typing.FrozenSet[int]): | |
return (name, ages) | |
def example4(name:str, age:int, tags:typing.MutableSet[str]): | |
return (name, age, tags) | |
def example5(name:str, age:int, tags:typing.MutableSet[str]): | |
return (name, age, tags) | |
def example6(handle:str, age:int, tags:typing.FrozenSet[str]) -> Tuple[str, int, typing.FrozenSet[str]]: | |
return (handle, age, tags) | |
sig = signature(example) | |
args = ('amcgregor', ) | |
kwargs = {'age': '27', 'tags': ['bob', 'dole', 27]} | |
bound = sig.bind(*args, **kwargs) | |
#print(sig, bound, bound.signature.parameters, sep="\n", end="\n\n") | |
for name, value in bound.arguments.items(): | |
annotation = bound.signature.parameters[name].annotation | |
alias = aliases.get(annotation, annotation) | |
if not annotation or alias is noop: | |
... | |
# print(f"NOP\t{name}={value}") | |
if isclass(alias) and isinstance(value, alias): | |
... | |
# print(f"ALIAS\t{name}={value!r}") | |
if isinstance(alias, TypeVar): | |
... | |
print("TVAR\t{alias.__dict__}") | |
# print(name, bool(annotation), annotation, value) | |
print("Signature:", sig) | |
print("Incoming data:", args, kwargs) | |
print("Bound:", bound) | |
def interactive(): | |
cast_arguments(bound) | |
print("Processed bound:", bound) | |
if __name__ == "__main__": | |
interactive() | |
#_type_hints_map = WeakKeyDictionary() # type: Dict[FunctionType, Dict[str, Any]] | |
#T_CallableOrType = TypeVar('T_CallableOrType', Callable, Type[Any]) | |
#def resolve_forwardref(maybe_ref, memo: _CallMemo): | |
# if isinstance(maybe_ref, ForwardRef): | |
# return evaluate_forwardref(maybe_ref, memo.func.__globals__, {}) | |
# else: | |
# return maybe_ref | |
''' | |
SPLIT = lambda v: ",".split(v) if isinstance(v, str) else list(v) | |
AnnotationAliases = Mapping[type, type] | |
Mapper = Callable[[str], Any] | |
AnnotationMappers = Mapping[type, Mapper] | |
# Helpers utilized in the aliases and mappings below. | |
def die(type): | |
"""Handle the given ABC or typing hint by exploding.""" | |
def inner(value): | |
raise TypeError(f"Can not cast to {type!r}, concrete simple type references are preferred.") | |
return inner | |
def _nop(value): | |
"""A no-operation identity transformation if the abstract type implies or is satisfied by Unicode text. | |
Use of this indicates a Unicode string is a suitable member of that abstract set. | |
""" | |
return value | |
# Typecasting assistance. | |
def to_bytes(value:str) -> bytes: | |
return value.encode('utf8') if isinstance(value, str) else bytes(value), | |
# Many type annotations are "abstract", so we map them to "concrete" types to permit casting on ingress. | |
aliases:AnnotationAliases = { | |
# Core datatypes which may require some assistance to translate from the web. | |
bytes: to_bytes, | |
# Map abstract base classes to their constructors. | |
abc.ByteString: bytes, | |
abc.Container: nop, | |
abc.Hashable: nop, | |
abc.Iterable: nop, | |
abc.Iterator: iter, | |
abc.Mapping: dict, | |
abc.MutableMapping: dict, | |
abc.Reversible: nop, | |
abc.Sequence: nop, | |
abc.Set: set, | |
abc.Sized: nop, | |
# "Shallow" pseudo-types mapped to explosions, real types, or casting callables. | |
typing.Any: nop, | |
typing.AnyStr: lambda v: str(v), | |
typing.AsyncContextManager: die(typing.AsyncContextManager), | |
typing.AsyncGenerator: die(typing.AsyncGenerator), | |
typing.AsyncIterable: die(typing.AsyncIterable), | |
typing.AsyncIterator: die(typing.AsyncIterator), | |
typing.Awaitable: die(typing.Awaitable), | |
typing.ByteString: to_bytes, | |
typing.Callable: die(typing.Callable), | |
typing.ChainMap: die(typing.ChainMap), | |
typing.ClassVar: die(typing.ClassVar), | |
typing.ContextManager: die(typing.ContextManager), | |
typing.Coroutine: die(typing.Coroutine), | |
typing.Counter: die(typing.Counter), | |
typing.DefaultDict: die(typing.DefaultDict), | |
typing.ForwardRef: die(typing.ForwardRef), | |
typing.Generator: die(typing.Generator), | |
typing.Generic: die(typing.Generic), | |
typing.Hashable: nop, | |
typing.ItemsView: die(typing.ItemsView), # TODO: dict and call .items() | |
typing.Iterator: die(typing.Iterator), # TODO: automatically call .iter() | |
typing.KeysView: die(typing.KeysView), # TODO: dict and call .keys | |
typing.MappingView: die(typing.MappingView), # TODO: dict and call .values() | |
typing.Match: die(typing.Match), | |
typing.NamedTuple: die(typing.NamedTuple), | |
typing.Reversible: nop, | |
typing.Sized: nop, | |
typing.IO: StringIO, | |
typing.SupportsAbs: float, | |
typing.SupportsBytes: bytes, | |
typing.SupportsFloat: float, | |
typing.SupportsInt: int, | |
typing.SupportsRound: float, | |
typing.Pattern: regex, | |
# Potentially nested / recursive / "complex" pseudo-types. | |
typing.AbstractSet: set, | |
typing.Collection: die(typing.Collection), | |
typing.Container: die(typing.Container), | |
typing.OrderedDict: dict, | |
typing.FrozenSet: frozenset, | |
typing.Iterable: nop, | |
typing.List: list, | |
typing.Sequence: nop, | |
typing.Mapping: dict, | |
typing.MutableMapping: dict, | |
typing.MutableSequence: list, | |
typing.MutableSet: set, | |
typing.Optional: die(typing.Optional), # TODO: SPECIAL CASE TO UNPACK | |
typing.Set: set, | |
typing.Tuple: die(typing.Tuple), # TODO: Container with possible nested types. | |
# typing.: die(typing.), | |
} | |
mapper:AnnotationMappers = { # Mechanisms to produce the desired type from basic Unicode text input. | |
list: lambda v: v.split(",") if isinstance(v, str) else list(v), | |
set: lambda v: v.split(",") if isinstance(v, str) else set(v), | |
# dict: ... | |
} | |
def collect(self, context:Context, handler:Callable, args:List, kw:Dict[str,Any]) -> None: | |
"""Inspect and potentially mutate the arguments to the handler. | |
The args list and kw dictionary may be freely modified, though invalid arguments to the handler will fail. | |
""" | |
spec = getfullargspec(handler) | |
arguments = list(spec.args) | |
if ismethod(handler): del arguments[0] # Automatically remove `self` arguments from consideration. | |
def cast(key, annotation, value): | |
"""Attempt to typecast data incoming from the web.""" | |
annotation = self.aliases.get(annotation, annotation) | |
if isinstance(annotation, type) and isinstance(value, annotation): return value # Nothing to do. | |
annotation = self.mapper.get(annotation, annotation) | |
try: | |
value = annotation(value) | |
except (ValueError, TypeError) as e: | |
raise HTTPBadRequest(f"{e.__class__.__name__}: {e} while processing endpoint argument '{arg}'") | |
return value | |
# Process positional arguments. | |
for i, (key, annotation, value) in enumerate((k, spec.annotations.get(k), v) for k, v in zip(arguments, args)): | |
if not annotation: continue # Skip right past non-annotated arguments. | |
args[i] = cast(key, annotation, value) | |
# Process keyword arguments. | |
for key, annotation, value in ((k, spec.annotations.get(k), v) for k, v in kw.items()): | |
if not annotation: continue # Skip right past non-annotated arguments. | |
kw[key] = cast(key, annotation, value) | |
''' | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment