Skip to content

Instantly share code, notes, and snippets.

@achimnol
Last active September 14, 2017 10:24
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save achimnol/5b91b046ec6584a11c67286829326477 to your computer and use it in GitHub Desktop.
Save achimnol/5b91b046ec6584a11c67286829326477 to your computer and use it in GitHub Desktop.
GraphQL Sample in Python
import asyncio
import base64
import enum
import secrets
from pprint import pprint
import graphene
from graphql.execution.executors.asyncio import AsyncioExecutor
keypair_registry = {
1: ('AKIATESTING', 'zxcvzxcvzxcvzcxv'),
}
class CreateKeyPair(graphene.Mutation):
class Arguments:
user_id = graphene.Int(required=True)
ok = graphene.Boolean()
keypair = graphene.Field(lambda: KeyPair)
@staticmethod
async def mutate(root, info, user_id):
if user_id in keypair_registry:
return CreateKeyPair(ok=False, keypair=None)
ak = 'AKIA' + base64.b32encode(secrets.token_bytes(10)).decode('ascii')
sk = secrets.token_urlsafe(30)
kp = KeyPair(user_id=user_id, access_key=ak, secret_key=sk)
keypair_registry[user_id] = (ak, sk)
return CreateKeyPair(ok=True, keypair=kp)
class KeyPair(graphene.ObjectType):
user_id = graphene.Int()
access_key = graphene.String()
secret_key = graphene.String()
class Mutation(graphene.ObjectType):
create_keypair = CreateKeyPair.Field()
class Query(graphene.ObjectType):
keypair = graphene.Field(KeyPair, user_id=graphene.Int(required=True))
async def resolve_keypair(self, info, user_id):
ret = keypair_registry.get(user_id)
if ret is None:
return None
return KeyPair(user_id=user_id, access_key=ret[0], secret_key=ret[1])
async def run_sample_query(schema, executor, msg, q):
print('=========' + msg + '===========')
print(q)
print('-------------------------------')
result = await schema.execute(q, executor, return_promise=True)
if result.errors:
print(result.errors)
else:
pprint(result.data)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
executor = AsyncioExecutor(loop=loop)
schema = graphene.Schema(
query=Query,
mutation=Mutation,
auto_camelcase=False)
q = '''query {
keypair(user_id:1) {
access_key
secret_key
}
session(sess_id:"abc") {
owner_access_key
}
}'''
loop.run_until_complete(
run_sample_query(schema, executor, 'getting an object', q))
q = '''query {
keypair(user_id:2) {
access_key
secret_key
}
}'''
loop.run_until_complete(
run_sample_query(schema, executor,
'trying to get non-existent object', q))
q = '''mutation MyFirstMutation {
create_keypair(user_id:2) {
keypair {
access_key
secret_key
}
ok
}
}'''
loop.run_until_complete(
run_sample_query(schema, executor,
'inserting an object', q))
q = '''query {
keypair(user_id:2) {
access_key
secret_key
}
}'''
loop.run_until_complete(
run_sample_query(schema, executor,
'getting the inserted object', q))
q = '''mutation MySecondMutation {
create_keypair(user_id:2) {
keypair {
access_key
secret_key
}
ok
}
}'''
loop.run_until_complete(
run_sample_query(schema, executor,
'trying to insert duplicate object', q))
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment