Skip to content

Instantly share code, notes, and snippets.

@witt3rd
Created March 20, 2019 17:44
Show Gist options
  • Save witt3rd/1282fb1c7b3bd2a5f3fc9ff4348d4f81 to your computer and use it in GitHub Desktop.
Save witt3rd/1282fb1c7b3bd2a5f3fc9ff4348d4f81 to your computer and use it in GitHub Desktop.

Ingesting Hierarchical Data

This will work for any model that you hook it up to, you just need to define the types in your service schema.

As it’s inspecting the metadata that graphql carries with each query to figure out which fields correspond to what type to construct the mutations (and the types have to be the same as those in the model in Q) because it assumes the boilerplate will look like:

def create_mutation_singular(type):
    mutation = "mutation addData($input: Add{0}Input!){{ add{0}(input: $input ) }}".format(
        type)
    return mutation

One other improvement that can be made is that right now it only does one instance at a time (and it’s not heavily tested on arrays of things), so adding another mutation that it can create for adding multiple X would make it faster in this case and can easily be dropped in graphql_tools.py.

And one small change (shouldn’t come up often) that hasn’t been merged yet, if you query for fields that you are not passing in as data to be uploaded, right now it errors out

Example Resolvers and Helpers

Note the function, add_generic_instance.

from CKGClient import CKGClient
import json
import string
import aiohttp
import base64
import pandas as pd
import io
import logging
from datetime import datetime, timedelta, date
import copy
import utils
import graphql as gql
from asyncio import ensure_future
import copy

QTYS = ['RateQty' 'VolumeQty',
        'DensityQty', 'WeightQty', 'SizeQty', 'DistanceQty', 'TimeQty', 'MassQty'
        'SpeedQty']


logger = logging.getLogger(__name__)

resolvers = {
    'Query': {
        'fleetSchedule': lambda value, info, id: pass_query_to_client(info, info.context['shipping_model_test'])
    },

    'Mutation': {
        'addFleetSchedule': lambda value, info, fleetSchedule: add_generic_instance(fleetSchedule, info.context['shipping_model_client'], info),
        'addVesselPortIncompatability': lambda value, info, vesselPortIncompatability: add_generic_instance(vesselPortIncompatability, info.context['shipping_model_client'], info),
    },
    'TimeCharterVesselSchedule': {},
    'CostQty': {},
    'VesselAction': {},
    'FleetSchedule': {},
    'Port': {},
    'LoadDetail': {},
    'VoyageDetail': {},
    'Requirement': {},
    'DateRange': {},
    'Vessel': {},
    'DischargeDetail': {},
    'Product': {},
    'VolumeQty': {},
    'VesselSizeCategory': {},
    'UtilizationQty': {},
    'VesselStatus': {},
    'RequirementFulfillmentSchedule': {},
    'Currency': {},
    'DistanceQty': {},
    'ScheduleChange': {},
    'Long': {},
    'Short': {},
    'SpeedQty': {},
    'RateQty': {},
    'WaitDetail': {},
    'VesselPortIncompatability': {}
}


async def create_qty_id(qty):
    return " ".join([str(qty['value']), qty['unit']])


async def create_cost_id(cost):
    return " ".join([str(cost['value']), cost['currency']['name']])


async def create_ID_object(obje, schema, return_type):
    new_object = obje.copy()
    fieldType = schema.get_type(return_type)
    for key, val in new_object.items():
        subFieldType = fieldType.fields[key].type
        logger.debug("subFieldType: %s", subFieldType)
        if isinstance(subFieldType, gql.GraphQLNonNull):
            subFieldType = subFieldType.of_type
        if isinstance(subFieldType, gql.GraphQLScalarType):
            new_object[key] = str(val)
            if isinstance(val, str):
                new_object[key] = val
            elif isinstance(val, datetime):
                new_object[key] = str(val.date())
            else:
                new_object[key] = str(val)
            continue
        elif isinstance(subFieldType, gql.GraphQLObjectType):
            new_object[key] = await create_instance_id(val, subFieldType.name)
            continue
        elif isinstance(subFieldType, gql.GraphQLList):
            if isinstance(subFieldType.of_type, gql.GraphQLNonNull):
                subFieldType = subFieldType.of_type
            new_object[key] = [await create_instance_id(k, subFieldType.of_type.name) for k in val]
            continue
    return new_object


async def create_date_range_id(dateRange):
    return str(dateRange['startDate'].timestamp()) + "-" + str(dateRange['endDate'].timestamp())


async def create_instance_id(instance, return_type):
    logger.debug("creating instance id")
    if return_type in QTYS:
        return await create_qty_id(instance)
    elif return_type == "CostQty":
        return await create_cost_id(instance)
    elif return_type == "Currency":
        return instance['name']
    elif return_type == "DateRange":
        return await create_date_range_id(instance)
    elif 'id' in instance:
        return instance['id']
    else:
        return None


async def get_type_name(typ):
    if isinstance(typ, gql.GraphQLObjectType):
        return typ.name
    elif isinstance(typ, gql.GraphQLNonNull):
        return await get_type_name(typ.of_type)
    elif isinstance(typ, gql.GraphQLList):
        return await get_type_name(typ.of_type)
    else:
        logger.warn("Cannot Resolve Type Name of {}".format(typ))
        return None


async def add_generic_instance(instance, client, info):
    logger.debug("instance: %s", instance)
    logger.debug("return type: %s", type(info.return_type))
    type_name = await get_type_name(info.return_type)
    mutation = create_mutation_singular(type_name)
    logger.debug("mutation: %s", mutation)
    schema = info.schema
    instance_with_ids = await create_ID_object(instance, schema, type_name)
    if 'id' not in instance:
        instance_id = await create_instance_id(instance, type_name)
        if instance_id is not None:
            instance_with_ids['id'] = instance_id

    variables = {'input': instance_with_ids}
    logger.debug("vars: %s", variables)

    res = await client.async_query(mutation, variables)
    logger.debug("response: %s", res)
    logger.debug("\n")
    return instance


async def remove_instance(instance, client, info):
    logger.info("inside remove instance")


async def pass_query_to_client(info, client):
    query = gql.language.printer.print_ast(info.operation)
    ret = await client.async_query(query)
    return ret['data'][info.field_name]


async def test_stuff(value, info, **args):
    return
    # print()
    # print('field_name', info.field_name)
    # print('asts', info.field_asts)
    # print(gql.language.printer.print_ast(info.operation))
    # print('return_type', info.return_type)
    # print('partent_type', info.parent_type)
    # print('schema', info.schema)
    # print('fragments', info.fragments)
    # print('root_value', info.root_value)
    # info.context
    # print('operation', info.operation)
    # print(type(gql.language.printer.print_ast(info.operation)))
    # print('var_vals', info.variable_values)
    # print('CostQtyValueResolver', info.schema.get_type(
    # 'CostQty').fields['value'].resolver)
    # print('CostQtyCurrencyResolver', info.schema.get_type(
    # 'CostQty').fields['currency'].resolver)
    # print(info.context)
    # info.schema.get_type(
    # 'CostQty').fields['value'].resolver(value, info, **args)
    # print(106, type(info.schema.get_type('CostQty')))
    # print(info)
    # print(type(value))
    # print(args)
    # return args['argument']


def create_mutation_singular(type):
    mutation = "mutation addData($input: Add{0}Input!){{ add{0}(input: $input ) }}".format(
        type)
    return mutation

Improvements to graphql_tools.py

import graphql
import logging
from utils import get_gql_type_nonull
import resolvers as res

# build_executable schema
#
# accepts schema_definition (string) and resolvers (object) in style of graphql-tools
# returns a schema ready for execution

logger = logging.getLogger(__name__)


def build_executable_schema(schema_definition, resolvers, scalars):
    ast = graphql.parse(schema_definition)
    schema = graphql.build_ast_schema(ast)

    for scalar in scalars:
        type = schema.get_type(scalar)
        type.description = scalars[scalar]['description']
        type.serialize = scalars[scalar]['serialize']
        type.parse_literal = scalars[scalar]['parse_literal']
        type.parse_value = scalars[scalar]['parse_value']

    for typeName in resolvers:
        fieldType = schema.get_type(typeName)

        for fieldName in resolvers[typeName]:
            if fieldType is graphql.GraphQLScalarType:
                fieldType.fields[fieldName].resolver = resolvers[typeName][fieldName]
                continue
            field = fieldType.fields[fieldName]
            field.resolver = resolvers[typeName][fieldName]

        if not fieldType.fields:
            continue

        for remaining in fieldType.fields:
            if not fieldType.fields[remaining].resolver:
                logger.debug("remaining field name: %s.%s",
                             typeName, remaining)
                logger.debug("remaining field type: %s",
                             get_gql_type_nonull(fieldType.fields[remaining].type))
                if isinstance(get_gql_type_nonull(fieldType.fields[remaining].type), graphql.GraphQLScalarType):
                    logger.debug("scalar type: %s.%s", remaining,
                                 fieldType.fields[remaining].type)
                    fieldType.fields[remaining].resolver = \
                        lambda value, info, _r=remaining, **args: value[_r]
                elif isinstance(get_gql_type_nonull(fieldType.fields[remaining].type), graphql.GraphQLObjectType):
                    logger.debug("object type: %s.%s", remaining,
                                 fieldType.fields[remaining].type)
                    fieldType.fields[remaining].resolver = \
                        lambda value, info, _r=remaining, **args: res.add_generic_instance(
                            value[_r], info.context['shipping_model_client'], info)
                elif isinstance(get_gql_type_nonull(fieldType.fields[remaining].type), graphql.GraphQLList):
                    logger.debug("list type: %s.%s", remaining,
                                 fieldType.fields[remaining].type)
                    fieldType.fields[remaining].resolver = \
                        lambda value, info, _r=remaining, **args: [res.add_generic_instance(
                            instance, info.context['shipping_model_client'], info) for instance in value[_r]]
                logger.debug("remaining field resolver: %s",
                             fieldType.fields[remaining].resolver)

    return schema
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment