Skip to content

Instantly share code, notes, and snippets.

@adhorn
Last active April 25, 2018 03:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adhorn/6c88dea22c4e5fdd32bcaabc9a45c989 to your computer and use it in GitHub Desktop.
Save adhorn/6c88dea22c4e5fdd32bcaabc9a45c989 to your computer and use it in GitHub Desktop.
from __future__ import unicode_literals
import sys
import logging
import boto3
import os
log = logging.getLogger()
log.setLevel(logging.DEBUG)
sys.path.insert(0, './vendored')
import simplejson as json
# simplejson can handle Decimal object while the \
# standard json package can't.
region = os.environ["AWS_REGION"]
dynamodb = boto3.resource('dynamodb', region_name=region)
table = dynamodb.Table('MyGlobalTable')
def get_from_dynamo(event):
log.debug("Received in get_from_dynamo: {}".format(json.dumps(event)))
item_id = event['pathParameters']['item_id']
log.debug("item_id: {}".format(item_id))
item = table.get_item(
Key={
'item_id': item_id,
}
)
return item['Item']
def get_item(event, context):
log.debug("Received event in get_item: {}".format(json.dumps(event)))
body = {
"item_id": get_from_dynamo(event),
}
response = {
"statusCode": 200,
"body": json.dumps(body)
}
return response
from __future__ import unicode_literals
import sys
import logging
import boto3
import os
log = logging.getLogger()
log.setLevel(logging.DEBUG)
sys.path.insert(0, './vendored')
import simplejson as json
def makehealthcheckcalls():
return os.environ["STATUS"]
def lambda_handler(event, context):
log.debug("Received event in get_profile: {}".format(json.dumps(event)))
response = {
"statusCode": makehealthcheckcalls()
}
return response
import json
import logging
import boto3
import os
import uuid
log = logging.getLogger()
log.setLevel(logging.DEBUG)
region = os.environ["AWS_REGION"]
dynamodb = boto3.resource('dynamodb', region_name=region)
table = dynamodb.Table('MyGlobalTable')
def put_to_dynamo(event):
log.debug("Received in put_to_dynamo: {}".format(json.dumps(event)))
data = json.loads(event["body"])
item_id = data.get("item_id", )
table.put_item(
Item={
'item_id': item_id
}
)
return item_id
def create_item(event, context):
log.debug("Event in create_item: {}".format(json.dumps(event)))
item_id = put_to_dynamo(event)
body = {
"item_id": " {}".format(item_id),
}
response = {
"statusCode": 200,
"body": json.dumps(body)
}
return response
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment