|
#!/bin/python |
|
import sys |
|
|
|
import yaml |
|
import json |
|
from yaml.loader import SafeLoader |
|
|
|
|
|
def replace_refs(old_ref: str, new_ref: str, the_dict: dict) -> dict: |
|
for key, value in the_dict.items(): |
|
if type(value) is dict: |
|
the_dict[key] = replace_refs(old_ref, new_ref, value) |
|
if key == '$ref' and value == old_ref: |
|
print(f"replacing '{old_ref}' with '{new_ref}'") |
|
the_dict[key] = new_ref |
|
|
|
return the_dict |
|
|
|
|
|
# moves 'required: true' from property level to properties level |
|
# in an array of required properties |
|
def fix_required(the_dict: dict) -> dict: |
|
for key in list(the_dict): |
|
if type(the_dict[key]) is not dict: |
|
continue |
|
|
|
# entrypoint for check is 'schema' |
|
for current_key in list(the_dict[key]): |
|
if current_key != 'schema' or type(the_dict[key][current_key]) is not dict: |
|
continue |
|
|
|
schema_child_dicts = dict(the_dict[key][current_key]) |
|
|
|
# loop over 'schema' |
|
for schema_child_key in list(schema_child_dicts): |
|
if schema_child_key != 'properties' or type(the_dict[key][current_key][schema_child_key]) is not dict: |
|
continue |
|
|
|
properties_dict = dict(the_dict[key][current_key][schema_child_key]) |
|
# loop over 'properties' |
|
for property_key in list(properties_dict): |
|
if type(the_dict[key][current_key][schema_child_key][property_key]) is not dict: |
|
continue |
|
|
|
# loop over fields of single property |
|
property_items = dict(the_dict[key][current_key][schema_child_key][property_key]) |
|
for property_field_key in list(property_items): |
|
if property_field_key != 'required' \ |
|
or type( |
|
the_dict[key][current_key][schema_child_key][property_key][property_field_key]) \ |
|
is not bool: |
|
continue |
|
|
|
# replace and delete 'required' property in original the_dict. This mess is required as we cant |
|
# modify directly while looping |
|
print(f"moving required from property '{property_key}'") |
|
required_array = the_dict[key][current_key].get('required', []) |
|
required_array.append(property_key) |
|
the_dict[key][current_key]['required'] = required_array |
|
del the_dict[key][current_key][schema_child_key][property_key][property_field_key] |
|
|
|
the_dict[key] = fix_required(the_dict[key]) |
|
|
|
return the_dict |
|
|
|
|
|
# adds missing type: 'object' |
|
def add_missing_type_object(the_dict: dict): |
|
for key in list(the_dict): |
|
if type(the_dict[key]) is not dict: |
|
continue |
|
|
|
# entrypoint for check is 'schema' |
|
for current_key in list(the_dict[key]): |
|
if current_key != 'schema' or type(the_dict[key][current_key]) is not dict: |
|
continue |
|
|
|
schema_child_dicts = dict(the_dict[key][current_key]) |
|
|
|
# if we have 'properties' in schema but not a 'type', add 'type: object' |
|
if 'properties' in schema_child_dicts.keys() and 'type' not in schema_child_dicts.keys(): |
|
the_dict[key][current_key]['type'] = 'object' |
|
|
|
the_dict[key] = add_missing_type_object(the_dict[key]) |
|
|
|
return the_dict |
|
|
|
|
|
if __name__ == '__main__': |
|
if len(sys.argv) < 3: |
|
print('Usage: python3 main.py <input file (valid JSON or YAML)> <output file (.json, .yml or .yaml)>') |
|
exit(1) |
|
|
|
with open(sys.argv[1]) as f: |
|
data = yaml.load(f, Loader=SafeLoader) |
|
|
|
if data is None: |
|
print(f"could read data from input file {sys.argv[1]}") |
|
exit(1) |
|
|
|
# removes invalid components/definition section and puts it in components/schemas |
|
print('moving /components/definitions/* to /components/schemas') |
|
data['components']['schemas'] = data['components']['schemas'] | data['components']['definitions'] |
|
for definition_key in dict(data['components']['definitions']).keys(): |
|
replace_refs(f"#/components/definitions/{definition_key}", f"#/components/schemas/{definition_key}", data) |
|
del data['components']['definitions'] |
|
|
|
# removes invalid components/x-metadata object and puts it in components/schemas |
|
print('moving /components/x-metadata to /components/schemas/x-metadata') |
|
data['components']['schemas']['x-metadata'] = data['components']['x-metadata'] |
|
replace_refs('#/components/x-metadata', '#/components/schemas/x-metadata', data) |
|
del data['components']['x-metadata'] |
|
|
|
data = fix_required(data) |
|
data = add_missing_type_object(data) |
|
|
|
with open(sys.argv[2], 'w') as output_file: |
|
if str(sys.argv[2]).endswith(".json"): |
|
print(f"writing processed spec to json file {sys.argv[2]} ...") |
|
json.dump(data, output_file) |
|
print("done") |
|
elif str(sys.argv[2]).endswith(".yml") or str(sys.argv[2]).endswith(".yaml"): |
|
print(f"writing processed spec to yaml file {sys.argv[2]} ...") |
|
yaml.dump(data, output_file) |
|
print("done") |
|
else: |
|
print("could not save file. Output file must have extension 'json', 'yml' or 'yaml'.") |