Skip to content

Instantly share code, notes, and snippets.

@martinapugliese
Last active June 3, 2021 21:59
Show Gist options
  • Star 39 You must be signed in to star a gist
  • Fork 14 You must be signed in to fork a gist
  • Save martinapugliese/cae86eb68f5aab59e87332725935fd5f to your computer and use it in GitHub Desktop.
Save martinapugliese/cae86eb68f5aab59e87332725935fd5f to your computer and use it in GitHub Desktop.
Some wrapper methods to deal with DynamoDB databases in Python, using boto3.
# Copyright (C) 2016 Martina Pugliese
from boto3 import resource
from boto3.dynamodb.conditions import Key
# The boto3 dynamoDB resource
dynamodb_resource = resource('dynamodb')
def get_table_metadata(table_name):
"""
Get some metadata about chosen table.
"""
table = dynamodb_resource.Table(table_name)
return {
'num_items': table.item_count,
'primary_key_name': table.key_schema[0],
'status': table.table_status,
'bytes_size': table.table_size_bytes,
'global_secondary_indices': table.global_secondary_indexes
}
def read_table_item(table_name, pk_name, pk_value):
"""
Return item read by primary key.
"""
table = dynamodb_resource.Table(table_name)
response = table.get_item(Key={pk_name: pk_value})
return response
def add_item(table_name, col_dict):
"""
Add one item (row) to table. col_dict is a dictionary {col_name: value}.
"""
table = dynamodb_resource.Table(table_name)
response = table.put_item(Item=col_dict)
return response
def delete_item(table_name, pk_name, pk_value):
"""
Delete an item (row) in table from its primary key.
"""
table = dynamodb_resource.Table(table_name)
response = table.delete_item(Key={pk_name: pk_value})
return
def scan_table_firstpage(table_name, filter_key=None, filter_value=None):
"""
Perform a scan operation on table. Can specify filter_key (col name) and its value to be filtered. This gets only first page of results in pagination. Returns the response.
"""
table = dynamodb_resource.Table(table_name)
if filter_key and filter_value:
filtering_exp = Key(filter_key).eq(filter_value)
response = table.scan(FilterExpression=filtering_exp)
else:
response = table.scan()
return response
def scan_table_allpages(table_name, filter_key=None, filter_value=None):
"""
Perform a scan operation on table. Can specify filter_key (col name) and its value to be filtered. This gets all pages of results.
Returns list of items.
"""
table = dynamodb_resource.Table(table_name)
if filter_key and filter_value:
filtering_exp = Key(filter_key).eq(filter_value)
response = table.scan(FilterExpression=filtering_exp)
else:
response = table.scan()
items = response['Items']
while True:
print len(response['Items'])
if response.get('LastEvaluatedKey'):
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
items += response['Items']
else:
break
return items
def query_table(table_name, filter_key=None, filter_value=None):
"""
Perform a query operation on the table. Can specify filter_key (col name) and its value to be filtered. Returns the response.
"""
table = dynamodb_resource.Table(table_name)
if filter_key and filter_value:
filtering_exp = Key(filter_key).eq(filter_value)
response = table.query(KeyConditionExpression=filtering_exp)
else:
response = table.query()
return response
@jayangshu84
Copy link

jayangshu84 commented Jun 27, 2017

Thanks for this wrapper. Very helpful!
You are a 🌟 !
Oh! by the way, I unashamedly stole it for my work.

@PanditGensler
Copy link

PanditGensler commented Dec 24, 2017

Hello Martin ,
Need one help one the dynamo db with python lambda ,i am writing a python lambda function which gets the result from dynamo db, below is my patient table dynamo db structure .

{
"patient_id": 161,
"patientVisits": [
{
"Diagnosis": [
{
"Diagnosis_Completion": "Definitive",
"DiagnosisId": 100,
"ICD_Code": "test ICd code for dignosis Id 100"
}
],
"Follow_Up_Recommendation": "Follow up recommendation Test",
"visitId": 123
},
{
"Diagnosis": [
{
"Diagnosis_Completion": "Definitive",
"DiagnosisId": 100,
"ICD_Code": "test ICd code for dignosis Id 100"
}
],
"visitId": 124
}]}
Here patient table has key as patient_id and its has many attributes , one of the attribute is List of patientVisits which again has visitId and also includes the List of Diagnosis, my requirement is it to get the Diagnosis based on patientId and visitId, so i have tried to write the python lambda is like this

def list(event, context):
table = dynamodb.Table(os.environ['PATIENT_TABLE'])
patientId = 130
visitId = 124
# fetch all patientVisits from the database
result = table.query(
ProjectionExpression="#patientId, #pv.Diagnosis",
FilterExpression = '#pv.visitId = :visit_id',
ExpressionAttributeNames={ "#patientId": "patientId", "#pv" : "patientVisits"}, # Expression Attribute Names for Projection Expression only.
ExpressionAttributeValues= {":visit_id": visitId},
KeyConditionExpression=Key('patient_id').eq(patientId)
)
the above is giving me zero results, Can you please help me on this , if i change ProjectionExpression #pv.Diagnosis to #pv[0].Diagnosis aswell the FilterExpression then this works only gives me the zeroth index and also if the visitId matches with the zeroth index visitID, but i want to get the Diagnosis list from the patientVisits which is based on the visitId and patientId, can you please help me on this . struggling from the long time

Thanks you in advance!!!

@nathanleiby
Copy link

A simple, clear explanation. Thank you!

@MikeUdin
Copy link

Critical error in 87 line.
Next page going without filtering
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
Works fine this code:
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'],FilterExpression=filtering_exp)

@amaalzohny
Copy link

Thanks for this wrapper. Very helpful!

@adityarn
Copy link

MikeUdin is correct, line 87 needs modification. You need to add the FilterExpression

@emmanuelnk
Copy link

emmanuelnk commented Feb 27, 2019

Just another method I thought is missing, update item:

    def update_item(self, table_name, pk_name, pk_value, col_dict):
        """
        update one item (row) to table. col_dict is a dictionary {col_name: value}.
        """

        update_expression = 'SET {}'.format(','.join(f'#{k}=:{k}' for k in col_dict))
        expression_attribute_values = {f':{k}': v for k, v in col_dict.items()}
        expression_attribute_names = {f'#{k}': k for k in col_dict}

        table = dynamodb.Table(table_name)
        response = table.update_item(
            Key={'{}'.format(pk_name): pk_value},
            UpdateExpression=update_expression,
            ExpressionAttributeValues=expression_attribute_values,
            ExpressionAttributeNames=expression_attribute_names,
            ReturnValues='UPDATED_NEW',
        )

        return response

@carlosequiz
Copy link

Thank you very much! This helped me a lot at understanding how query works!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment