Created
June 22, 2017 18:30
-
-
Save galvez/18425fddfbbe733da637e1ebd3e104d0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import re | |
import code | |
import pprint | |
from marshmallow import Schema | |
from marshmallow.schema import Schema | |
from marshmallow.fields import Str, Bool, Int, List, Nested, DateTime, URL | |
from werkzeug.routing import Map, Rule, NotFound, RequestRedirect | |
BASE = os.path.abspath(os.path.dirname(__file__)) | |
def schemas(host='api.host'): | |
map = Map() | |
with open('%s/SCHEMAS' % BASE) as schemas: | |
for line in schemas: | |
line = line.rstrip() | |
if re.search('^[^\s]+', line): | |
schema = _gen_schema(line) | |
elif re.search('^\s+', line): | |
endpoint = re.sub('\W+', '-', line) | |
rule = Rule(line.strip(), endpoint=endpoint) | |
rule.schema = schema | |
map.add(rule) | |
_validate.matcher = map.bind(host) | |
return _validate | |
def _validate(uri, response): | |
try: | |
rule, args = _validate.matcher.match(uri, return_rule=True) | |
data, errors = rule.schema().loads(response) | |
return errors | |
except NotFound, RequestRedirect: | |
return None | |
def _gen_schema(spec): | |
fields = [] | |
for nested in re.findall('([\w,]+):{(.+)}(\??)', spec.strip()): | |
for field in nested[0].split(','): | |
fields.append([field, _gen_schema._handle_nested(nested[1]), not len(nested[2])]) | |
{% raw %}spec = spec.replace('%s:{%s}' % nested[:2], ''){% endraw %} | |
spec = spec.replace(' ', ' ') | |
fields += _gen_schema._handle_nested(spec) | |
for index, value in enumerate(fields): | |
if type(value[1]) is list: | |
nested = _gen_schema._make_schema(dict(value[1])) | |
fields[index] = [value[0], Nested(nested, required=not value[2], many=True)] | |
return _gen_schema._make_schema(fields) | |
def _make_schema(fields): | |
if type(fields) is list: | |
fields = dict(fields) | |
ts = str(time.time()).replace('.', '') | |
return type('schema_%s' % ts, (Schema,), fields) | |
def _handle_nested(spec): | |
fields = [] | |
for group in re.findall('([\w,]+):([\w\[\]]+)(\??)', spec): | |
for field in group[0].split(','): | |
ftype = None | |
if group[1].endswith('[]'): | |
value = _gen_schema._handle_list(group[1], not len(group[2])) | |
fields.append([field, value]) | |
else: | |
value = _gen_schema._type_hash[group[1]](required=not len(group[2])) | |
fields.append([field, value]) | |
return fields | |
def _handle_list(ltype, required): | |
return List(_gen_schema._type_hash[ltype.split('[]')[0]], required=required) | |
_gen_schema._type_hash = { | |
'str': Str, | |
'bool': Bool, | |
'int': Int, | |
'dt': DateTime, | |
'url': URL | |
} | |
_gen_schema._make_schema = _make_schema | |
_gen_schema._handle_nested = _handle_nested | |
_gen_schema._handle_list = _handle_list |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment