Last active
October 8, 2019 20:46
-
-
Save earonesty/29e65d9161f812c8088d60c7c5892315 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fastjsonschema as fjs | |
from urllib import parse as urlparse | |
from hashlib import md5 | |
_schema_dir = os.path.dirname(__file__) | |
_file_loader = {'file': lambda f: json.loads(open(_schema_dir + "/" + urlparse.urlsplit(f).path.lstrip('/')).read())} | |
def gen_var_name(schema, step="format"): | |
x=schema.copy() | |
x["step"]=step | |
x.pop('$comment',None) | |
return "_scmod_" + md5(json.dumps(x).encode("utf8")).hexdigest() | |
from pybase64 import b64decode, b64encode | |
def transcoder(data, type, context): | |
if context == "encode": | |
return b64encode(data).decode('utf8') | |
else: | |
return b64decode(data) | |
def anyof_guru(data, context, valfunc, ffunc): | |
if context == "encode": | |
ret = ffunc(data, context) | |
valfunc(ret) | |
return ret | |
else: | |
valfunc(data) | |
return ffunc(data, context) | |
class SchemaFormatCompiler: | |
def _noop(): | |
pass | |
def __init__(self): | |
self._all_code = "" | |
self._defined = set() | |
def _compile_formatter(self, schema, transcoder, handlers): | |
this_fname = gen_var_name(schema) | |
code = "" | |
anyOf = schema.get('anyOf') | |
if anyOf: | |
anyFn = False | |
for ent in anyOf: | |
vn = gen_var_name(ent, "valid") | |
if not globals().get(vn): | |
valid = fjs.compile(ent, handlers=_file_loader) | |
globals()[vn] = valid | |
fn = self._compile_formatter(ent, transcoder, handlers) | |
if not fn: | |
code += " try: %s(data); return data\n except Exception as e: ex=e\n" % (vn, ) | |
else: | |
anyFn = True | |
code += " try: return anyof_guru(data, context, %s, %s)\n except Exception as e: ex=e\n" % (vn, fn) | |
if not anyFn: | |
return "" | |
code += " raise Exception('Any of encoding fail: %s:%s' % (type(ex), ex))\n" | |
elif '$ref' in schema: | |
val = schema['$ref'] | |
scheme = urlparse.urlsplit(val)[0] | |
func = self._compile_formatter(handlers[scheme](val), transcoder, handlers) | |
return func | |
elif 'type' not in schema: | |
return "" | |
else: | |
val = schema['type'] | |
if val == "object": | |
if "properties" not in schema: | |
return | |
for name, val in schema['properties'].items(): | |
fn = self._compile_formatter(val, transcoder, handlers) | |
if fn: | |
code += " if '%s' in data:\n" % (name) | |
code += " data['%s'] = %s(data['%s'],context)\n" % (name, fn, name) | |
if not code: | |
return "" | |
code += " return data\n" | |
elif val == "array": | |
sub = schema.get('items') | |
if sub: | |
fn = self._compile_formatter(sub, transcoder, handlers) | |
if fn: | |
code += " for i, x in enumerate(data): data[i] = %s(data[i], context)\n" % (fn, ) | |
code += " return data\n" | |
elif "format" in schema: | |
code += " return %s(data, '%s', context)\n" % (transcoder, schema["format"]) | |
if not code: | |
return "" | |
if this_fname in self._defined: | |
return this_fname | |
code = ("def %s(data, context):\n" % (this_fname,)) + code | |
self._all_code += code + "\n" | |
self._defined.add(this_fname) | |
return this_fname | |
def compile(self, schema, transcoder, handlers={}): | |
self._all_code = "" | |
self._defined = set() | |
tn = gen_var_name(schema, "transcoder") | |
globals()[tn]=transcoder | |
name = self._compile_formatter(schema, tn, handlers) | |
exec(self._all_code, globals()) | |
if name: | |
return globals()[name] | |
else: | |
return lambda x, y: x | |
def test_load_schema(): | |
for filename in [d for d in os.listdir(_schema_dir)]: | |
if filename.endswith(".json"): | |
fpath = _schema_dir + "/" + filename | |
SchemaFormatCompiler().compile(json.load(open(fpath)), transcoder, handlers=_file_loader) | |
def test_format_rename(): | |
fpath = _schema_dir + "/" + "mofnop_operations.json" | |
ff = SchemaFormatCompiler().compile(json.load(open(fpath)), transcoder, handlers=_file_loader) | |
obj1 = { | |
"fileID" : "boo", | |
"encShard" : "aGVsbG8=", | |
"newName" : "rokr", | |
} | |
ff(obj1, "decode") | |
assert obj1["encShard"] == b'hello' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"$comment": "mofnop:rename", | |
"type": "object", | |
"properties": { | |
"fileID": { | |
"$comment": "object ID if a file", | |
"type": "string" | |
}, | |
"encShard": { | |
"$comment" : "Optionally, the destination device's encrypted shard for this object", | |
"type": "string", | |
"format": "bytes" | |
}, | |
"metadata": { | |
"$comment" : "The metadata accompaning the encShard", | |
"type": "string" | |
}, | |
"dirPath": { | |
"$comment": "dir path relative to the virtual root", | |
"type": "string" | |
}, | |
"newName": { | |
"type": "string" | |
} | |
}, | |
"required": [ | |
"newName" | |
], | |
"oneOf" : [ | |
{ "required" : ["fileID"] }, | |
{ "required" : ["dirPath"] } | |
] | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment