Skip to content

Instantly share code, notes, and snippets.

@earonesty
Last active October 8, 2019 20:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save earonesty/29e65d9161f812c8088d60c7c5892315 to your computer and use it in GitHub Desktop.
Save earonesty/29e65d9161f812c8088d60c7c5892315 to your computer and use it in GitHub Desktop.
import fastjsonschema as fjs
from urllib import parse as urlparse
from hashlib import md5
_schema_dir = os.path.dirname(__file__)
_file_loader = {'file': lambda f: json.loads(open(_schema_dir + "/" + urlparse.urlsplit(f).path.lstrip('/')).read())}
def gen_var_name(schema, step="format"):
x=schema.copy()
x["step"]=step
x.pop('$comment',None)
return "_scmod_" + md5(json.dumps(x).encode("utf8")).hexdigest()
from pybase64 import b64decode, b64encode
def transcoder(data, type, context):
if context == "encode":
return b64encode(data).decode('utf8')
else:
return b64decode(data)
def anyof_guru(data, context, valfunc, ffunc):
if context == "encode":
ret = ffunc(data, context)
valfunc(ret)
return ret
else:
valfunc(data)
return ffunc(data, context)
class SchemaFormatCompiler:
def _noop():
pass
def __init__(self):
self._all_code = ""
self._defined = set()
def _compile_formatter(self, schema, transcoder, handlers):
this_fname = gen_var_name(schema)
code = ""
anyOf = schema.get('anyOf')
if anyOf:
anyFn = False
for ent in anyOf:
vn = gen_var_name(ent, "valid")
if not globals().get(vn):
valid = fjs.compile(ent, handlers=_file_loader)
globals()[vn] = valid
fn = self._compile_formatter(ent, transcoder, handlers)
if not fn:
code += " try: %s(data); return data\n except Exception as e: ex=e\n" % (vn, )
else:
anyFn = True
code += " try: return anyof_guru(data, context, %s, %s)\n except Exception as e: ex=e\n" % (vn, fn)
if not anyFn:
return ""
code += " raise Exception('Any of encoding fail: %s:%s' % (type(ex), ex))\n"
elif '$ref' in schema:
val = schema['$ref']
scheme = urlparse.urlsplit(val)[0]
func = self._compile_formatter(handlers[scheme](val), transcoder, handlers)
return func
elif 'type' not in schema:
return ""
else:
val = schema['type']
if val == "object":
if "properties" not in schema:
return
for name, val in schema['properties'].items():
fn = self._compile_formatter(val, transcoder, handlers)
if fn:
code += " if '%s' in data:\n" % (name)
code += " data['%s'] = %s(data['%s'],context)\n" % (name, fn, name)
if not code:
return ""
code += " return data\n"
elif val == "array":
sub = schema.get('items')
if sub:
fn = self._compile_formatter(sub, transcoder, handlers)
if fn:
code += " for i, x in enumerate(data): data[i] = %s(data[i], context)\n" % (fn, )
code += " return data\n"
elif "format" in schema:
code += " return %s(data, '%s', context)\n" % (transcoder, schema["format"])
if not code:
return ""
if this_fname in self._defined:
return this_fname
code = ("def %s(data, context):\n" % (this_fname,)) + code
self._all_code += code + "\n"
self._defined.add(this_fname)
return this_fname
def compile(self, schema, transcoder, handlers={}):
self._all_code = ""
self._defined = set()
tn = gen_var_name(schema, "transcoder")
globals()[tn]=transcoder
name = self._compile_formatter(schema, tn, handlers)
exec(self._all_code, globals())
if name:
return globals()[name]
else:
return lambda x, y: x
def test_load_schema():
for filename in [d for d in os.listdir(_schema_dir)]:
if filename.endswith(".json"):
fpath = _schema_dir + "/" + filename
SchemaFormatCompiler().compile(json.load(open(fpath)), transcoder, handlers=_file_loader)
def test_format_rename():
fpath = _schema_dir + "/" + "mofnop_operations.json"
ff = SchemaFormatCompiler().compile(json.load(open(fpath)), transcoder, handlers=_file_loader)
obj1 = {
"fileID" : "boo",
"encShard" : "aGVsbG8=",
"newName" : "rokr",
}
ff(obj1, "decode")
assert obj1["encShard"] == b'hello'
{
"$comment": "mofnop:rename",
"type": "object",
"properties": {
"fileID": {
"$comment": "object ID if a file",
"type": "string"
},
"encShard": {
"$comment" : "Optionally, the destination device's encrypted shard for this object",
"type": "string",
"format": "bytes"
},
"metadata": {
"$comment" : "The metadata accompaning the encShard",
"type": "string"
},
"dirPath": {
"$comment": "dir path relative to the virtual root",
"type": "string"
},
"newName": {
"type": "string"
}
},
"required": [
"newName"
],
"oneOf" : [
{ "required" : ["fileID"] },
{ "required" : ["dirPath"] }
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment