This will work for any model that you hook it up to, you just need to define the types in your service schema.
As it’s inspecting the metadata that graphql carries with each query to figure out which fields correspond to what type to construct the mutations (and the types have to be the same as those in the model in Q) because it assumes the boilerplate will look like:
def create_mutation_singular(type):
mutation = "mutation addData($input: Add{0}Input!){{ add{0}(input: $input ) }}".format(
type)
return mutation
One other improvement that can be made is that right now it only does one instance at a time (and it’s not heavily tested on arrays of things), so adding another mutation that it can create for adding multiple X would make it faster in this case and can easily be dropped in graphql_tools.py
.
And one small change (shouldn’t come up often) that hasn’t been merged yet, if you query for fields that you are not passing in as data to be uploaded, right now it errors out
Note the function, add_generic_instance
.
from CKGClient import CKGClient
import json
import string
import aiohttp
import base64
import pandas as pd
import io
import logging
from datetime import datetime, timedelta, date
import copy
import utils
import graphql as gql
from asyncio import ensure_future
import copy
QTYS = ['RateQty' 'VolumeQty',
'DensityQty', 'WeightQty', 'SizeQty', 'DistanceQty', 'TimeQty', 'MassQty'
'SpeedQty']
logger = logging.getLogger(__name__)
resolvers = {
'Query': {
'fleetSchedule': lambda value, info, id: pass_query_to_client(info, info.context['shipping_model_test'])
},
'Mutation': {
'addFleetSchedule': lambda value, info, fleetSchedule: add_generic_instance(fleetSchedule, info.context['shipping_model_client'], info),
'addVesselPortIncompatability': lambda value, info, vesselPortIncompatability: add_generic_instance(vesselPortIncompatability, info.context['shipping_model_client'], info),
},
'TimeCharterVesselSchedule': {},
'CostQty': {},
'VesselAction': {},
'FleetSchedule': {},
'Port': {},
'LoadDetail': {},
'VoyageDetail': {},
'Requirement': {},
'DateRange': {},
'Vessel': {},
'DischargeDetail': {},
'Product': {},
'VolumeQty': {},
'VesselSizeCategory': {},
'UtilizationQty': {},
'VesselStatus': {},
'RequirementFulfillmentSchedule': {},
'Currency': {},
'DistanceQty': {},
'ScheduleChange': {},
'Long': {},
'Short': {},
'SpeedQty': {},
'RateQty': {},
'WaitDetail': {},
'VesselPortIncompatability': {}
}
async def create_qty_id(qty):
return " ".join([str(qty['value']), qty['unit']])
async def create_cost_id(cost):
return " ".join([str(cost['value']), cost['currency']['name']])
async def create_ID_object(obje, schema, return_type):
new_object = obje.copy()
fieldType = schema.get_type(return_type)
for key, val in new_object.items():
subFieldType = fieldType.fields[key].type
logger.debug("subFieldType: %s", subFieldType)
if isinstance(subFieldType, gql.GraphQLNonNull):
subFieldType = subFieldType.of_type
if isinstance(subFieldType, gql.GraphQLScalarType):
new_object[key] = str(val)
if isinstance(val, str):
new_object[key] = val
elif isinstance(val, datetime):
new_object[key] = str(val.date())
else:
new_object[key] = str(val)
continue
elif isinstance(subFieldType, gql.GraphQLObjectType):
new_object[key] = await create_instance_id(val, subFieldType.name)
continue
elif isinstance(subFieldType, gql.GraphQLList):
if isinstance(subFieldType.of_type, gql.GraphQLNonNull):
subFieldType = subFieldType.of_type
new_object[key] = [await create_instance_id(k, subFieldType.of_type.name) for k in val]
continue
return new_object
async def create_date_range_id(dateRange):
return str(dateRange['startDate'].timestamp()) + "-" + str(dateRange['endDate'].timestamp())
async def create_instance_id(instance, return_type):
logger.debug("creating instance id")
if return_type in QTYS:
return await create_qty_id(instance)
elif return_type == "CostQty":
return await create_cost_id(instance)
elif return_type == "Currency":
return instance['name']
elif return_type == "DateRange":
return await create_date_range_id(instance)
elif 'id' in instance:
return instance['id']
else:
return None
async def get_type_name(typ):
if isinstance(typ, gql.GraphQLObjectType):
return typ.name
elif isinstance(typ, gql.GraphQLNonNull):
return await get_type_name(typ.of_type)
elif isinstance(typ, gql.GraphQLList):
return await get_type_name(typ.of_type)
else:
logger.warn("Cannot Resolve Type Name of {}".format(typ))
return None
async def add_generic_instance(instance, client, info):
logger.debug("instance: %s", instance)
logger.debug("return type: %s", type(info.return_type))
type_name = await get_type_name(info.return_type)
mutation = create_mutation_singular(type_name)
logger.debug("mutation: %s", mutation)
schema = info.schema
instance_with_ids = await create_ID_object(instance, schema, type_name)
if 'id' not in instance:
instance_id = await create_instance_id(instance, type_name)
if instance_id is not None:
instance_with_ids['id'] = instance_id
variables = {'input': instance_with_ids}
logger.debug("vars: %s", variables)
res = await client.async_query(mutation, variables)
logger.debug("response: %s", res)
logger.debug("\n")
return instance
async def remove_instance(instance, client, info):
logger.info("inside remove instance")
async def pass_query_to_client(info, client):
query = gql.language.printer.print_ast(info.operation)
ret = await client.async_query(query)
return ret['data'][info.field_name]
async def test_stuff(value, info, **args):
return
# print()
# print('field_name', info.field_name)
# print('asts', info.field_asts)
# print(gql.language.printer.print_ast(info.operation))
# print('return_type', info.return_type)
# print('partent_type', info.parent_type)
# print('schema', info.schema)
# print('fragments', info.fragments)
# print('root_value', info.root_value)
# info.context
# print('operation', info.operation)
# print(type(gql.language.printer.print_ast(info.operation)))
# print('var_vals', info.variable_values)
# print('CostQtyValueResolver', info.schema.get_type(
# 'CostQty').fields['value'].resolver)
# print('CostQtyCurrencyResolver', info.schema.get_type(
# 'CostQty').fields['currency'].resolver)
# print(info.context)
# info.schema.get_type(
# 'CostQty').fields['value'].resolver(value, info, **args)
# print(106, type(info.schema.get_type('CostQty')))
# print(info)
# print(type(value))
# print(args)
# return args['argument']
def create_mutation_singular(type):
mutation = "mutation addData($input: Add{0}Input!){{ add{0}(input: $input ) }}".format(
type)
return mutation
import graphql
import logging
from utils import get_gql_type_nonull
import resolvers as res
# build_executable schema
#
# accepts schema_definition (string) and resolvers (object) in style of graphql-tools
# returns a schema ready for execution
logger = logging.getLogger(__name__)
def build_executable_schema(schema_definition, resolvers, scalars):
ast = graphql.parse(schema_definition)
schema = graphql.build_ast_schema(ast)
for scalar in scalars:
type = schema.get_type(scalar)
type.description = scalars[scalar]['description']
type.serialize = scalars[scalar]['serialize']
type.parse_literal = scalars[scalar]['parse_literal']
type.parse_value = scalars[scalar]['parse_value']
for typeName in resolvers:
fieldType = schema.get_type(typeName)
for fieldName in resolvers[typeName]:
if fieldType is graphql.GraphQLScalarType:
fieldType.fields[fieldName].resolver = resolvers[typeName][fieldName]
continue
field = fieldType.fields[fieldName]
field.resolver = resolvers[typeName][fieldName]
if not fieldType.fields:
continue
for remaining in fieldType.fields:
if not fieldType.fields[remaining].resolver:
logger.debug("remaining field name: %s.%s",
typeName, remaining)
logger.debug("remaining field type: %s",
get_gql_type_nonull(fieldType.fields[remaining].type))
if isinstance(get_gql_type_nonull(fieldType.fields[remaining].type), graphql.GraphQLScalarType):
logger.debug("scalar type: %s.%s", remaining,
fieldType.fields[remaining].type)
fieldType.fields[remaining].resolver = \
lambda value, info, _r=remaining, **args: value[_r]
elif isinstance(get_gql_type_nonull(fieldType.fields[remaining].type), graphql.GraphQLObjectType):
logger.debug("object type: %s.%s", remaining,
fieldType.fields[remaining].type)
fieldType.fields[remaining].resolver = \
lambda value, info, _r=remaining, **args: res.add_generic_instance(
value[_r], info.context['shipping_model_client'], info)
elif isinstance(get_gql_type_nonull(fieldType.fields[remaining].type), graphql.GraphQLList):
logger.debug("list type: %s.%s", remaining,
fieldType.fields[remaining].type)
fieldType.fields[remaining].resolver = \
lambda value, info, _r=remaining, **args: [res.add_generic_instance(
instance, info.context['shipping_model_client'], info) for instance in value[_r]]
logger.debug("remaining field resolver: %s",
fieldType.fields[remaining].resolver)
return schema