Skip to content

Instantly share code, notes, and snippets.

@kcwinner
Last active October 13, 2023 04:59
Show Gist options
  • Star 13 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kcwinner/20742479a42d9caa9b4a006504289c9f to your computer and use it in GitHub Desktop.
Save kcwinner/20742479a42d9caa9b4a006504289c9f to your computer and use it in GitHub Desktop.
Call AppSync GraphQL from Python Lambda Function
from lib import appsync
def lambda_handler(event, context):
print(event)
input = {
'name': 'test123',
'description': 'some cool description'
}
query = """
mutation createTodo(
$input: CreateTodoInput!
) {
createTodo(input: $input){
id
name
description
}
}
"""
res = appsync.query(query, { 'input': input })
print(res)
import requests
import json
import os
import boto3
from requests_aws_sign import AWSV4Sign
def query(query, variables: dict):
session = boto3.session.Session()
credentials = session.get_credentials()
region = session.region_name or 'us-east-2'
endpoint = os.environ.get('APPSYNC_URL', None)
headers={"Content-Type": "application/json"}
payload = {"query": query, 'variables': variables}
appsync_region = __parse_region_from_url(endpoint) or region
auth=AWSV4Sign(credentials, appsync_region, 'appsync')
try:
response = requests.post(
endpoint,
auth=auth,
json=payload,
headers=headers
).json()
if 'errors' in response:
print('Error attempting to query AppSync')
print(response['errors'])
else:
return response
except Exception as exception:
print('Error with Mutation')
print(exception)
return None
def __parse_region_from_url(url):
"""Parses the region from the appsync url so we call the correct region regardless of the session or the argument"""
# Example URL: https://xxxxxxx.appsync-api.us-east-2.amazonaws.com/graphql
split = url.split('.')
if 2 < len(split):
return split[2]
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment