Last active
March 2, 2018 19:18
-
-
Save knowsuchagency/48d972e371ef5d10b8833a8bd619a669 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async def configure_graphql(app): | |
""" | |
Since our resolvers depend on the app's db connection, this | |
co-routine must execute after that part of the application | |
is configured | |
""" | |
connection = app['connection'] | |
class Query(g.ObjectType): | |
author = g.Field(Author) | |
book = g.Field(Book) | |
authors = g.List( | |
Author, | |
# the following will be passed as named | |
# arguments to the resolver function. | |
# Don't ask why; it took me forever to | |
# figure it out. Despite its functionality, | |
# graphene's documentation leaves a lot to be desired | |
id=g.Int(), | |
first_name=g.String(), | |
last_name=g.String(), | |
age=g.Int(), | |
limit=g.Int( | |
description='The amount of results you wish to be limited to')) | |
books = g.List( | |
Book, | |
id=g.Int(), | |
title=g.String(), | |
published=g.String(), | |
author_id=g.Int( | |
description='The unique ID of the author in the database'), | |
limit=g.Int(description='The amount of results you with to be limited to')) | |
async def resolve_books(self, | |
info, | |
id=None, | |
title=None, | |
published=None, | |
author_id=None, | |
limit=None): | |
query_db = partial( | |
fetch_books, | |
connection, | |
id=id, | |
title=title, | |
published=published, | |
author_id=author_id, | |
limit=limit) | |
fetched = await app.loop.run_in_executor(None, query_db) | |
books = [] | |
for book_dict in fetched: | |
author = Author( | |
id=book_dict['author']['id'], | |
first_name=book_dict['author']['first_name'], | |
last_name=book_dict['author']['last_name'], | |
age=book_dict['author']['age']) | |
book = Book( | |
id=book_dict['id'], | |
title=book_dict['title'], | |
published=book_dict['published'], | |
author=author) | |
books.append(book) | |
return books | |
async def resolve_authors(self, | |
info, | |
id=None, | |
first_name=None, | |
last_name=None, | |
age=None, | |
limit=None): | |
query_db = partial( | |
fetch_authors, | |
connection, | |
id=id, | |
first_name=first_name, | |
last_name=last_name, | |
age=age, | |
limit=limit) | |
fetched = await app.loop.run_in_executor(None, query_db) | |
authors = [] | |
for author_dict in fetched: | |
books = [ | |
Book(id=b['id'], title=b['title'], published=b['published']) | |
for b in author_dict['books'] | |
] | |
author = Author( | |
id=author_dict['id'], | |
first_name=author_dict['first_name'], | |
last_name=author_dict['last_name'], | |
age=author_dict['age'], | |
books=books) | |
authors.append(author) | |
return authors | |
schema = g.Schema(query=Query, auto_camelcase=False) | |
# create the view | |
executor = AsyncioExecutor(loop=app.loop) | |
gql_view = GraphQLView(schema=schema, | |
executor=executor, | |
graphiql=True, | |
enable_async=True | |
) | |
# attach the view to the app router | |
app.router.add_route( | |
'*', | |
'/graphql', | |
gql_view, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment