Skip to content

Instantly share code, notes, and snippets.

@parijatmishra
Created February 10, 2016 11:51
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save parijatmishra/6560564a681c17e5ecf6 to your computer and use it in GitHub Desktop.
Save parijatmishra/6560564a681c17e5ecf6 to your computer and use it in GitHub Desktop.
Using DynamoDB Local Secondary Indexes - example in Python and boto3
#!/usr/bin/env python
# ddb_lsi_example: A sample program to show how to use Local Secondary Indexes
# in DynamoDB, using Python and boto3.
#
# Copyright 2016 Parijat Mishra
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import boto3
import random
import time
## Uncomment only one of the 'client' lines below
## Real dynamoDB -- Note: this may cost you money!
# client = boto3.client('dynamodb')
## DynamoDB local -- for testing. Set the port in endpoint_url to whatever port your
## DDB local is listening on. The region is just for simulation and to satisfy the API
client = boto3.client('dynamodb', endpoint_url='http://localhost:8000', region_name='us-west-2')
# Constants
TABLE_NAME='UserImages'
LSI_NAME='LatestLkes'
userids = ['parijat', 'james', 'marcus']
N = 10 # each user has uploaded many images previously
M = 5 # each user will randomly like these many other users' images
Y = 5 # for each user, we want to list these many most recent liked images
def create_table():
# skip if table already exists
try:
response = client.describe_table(TableName=TABLE_NAME)
# table exists...bail
print "Table [{}] already exists. Skipping table creation.".format(TABLE_NAME)
return
except:
pass # no table... good
print "Creating table [{}]".format(TABLE_NAME)
response = client.create_table(
TableName=TABLE_NAME,
KeySchema=[
{
'KeyType': 'HASH',
'AttributeName': 'userid'
},
{
'KeyType': 'RANGE',
'AttributeName': 'imageid'
}
],
LocalSecondaryIndexes=[
{
'IndexName': LSI_NAME,
'KeySchema': [
{
'KeyType': 'HASH',
'AttributeName': 'userid'
},
{
'KeyType': 'RANGE',
'AttributeName': 'last_like_time'
}
],
# Note: since we are projecting all the attributes of the table
# into the LSI, we could have set ProjectionType=ALL and
# skipped specifying the NonKeyAttributes
'Projection': {
'ProjectionType': 'INCLUDE',
'NonKeyAttributes': ['imageid', 'last_like_userid', 'total_likes']
}
}
],
AttributeDefinitions=[
{
'AttributeName': 'userid',
'AttributeType': 'S'
},
{
'AttributeName': 'imageid',
'AttributeType': 'S'
},
{
'AttributeName': 'last_like_time', # timestamp in secs
'AttributeType': 'N'
}
],
ProvisionedThroughput={
'ReadCapacityUnits': 2,
'WriteCapacityUnits': 2
}
)
print "Waiting for table [{}] to be created".format(TABLE_NAME)
waiter = client.get_waiter('table_exists')
waiter.wait(TableName=TABLE_NAME)
# if no exception, continue
print "Table created"
def populate_UserImageInfo():
# assume each user has posted N images, with imageids named
# <userid>-<#>, where # is a number between 1 and N inclusive.
#
# each user randomly likes M other images across all images
#
# since this is random, a user may like the same image more than once,
# but hey, this is a simulation
random.seed()
print "Updating likes..."
print "%10s | %15s | %15s | %24s | %10s | TWCU | IWCU" % ('Userid', 'ImageId', 'LastLikeUserId', 'LastLikeTime', 'TotalLikes')
for last_like_userid in userids:
candidate_userids = [u for u in userids if u != last_like_userid]
for i in xrange(M):
time.sleep(1) # pace the writes to respect provisioned capacity, and to ensure last_like_time changes
userid = random.choice(candidate_userids)
num = random.randint(1, N)
imageid = "{}-{}".format(userid, str(num))
last_like_time = int(time.time())
# insert or update a record with PK={userid, imageid}
# with last_like_userid, last_like_time and total_likes.
# total_likes is incremented from its existing value.
response = client.update_item(
TableName=TABLE_NAME,
Key={
'userid': {"S": userid},
'imageid': {"S": imageid}
},
ReturnValues='ALL_NEW',
UpdateExpression="SET last_like_userid=:u, last_like_time=:t ADD total_likes :i",
ExpressionAttributeValues={
":u": {"S": last_like_userid},
":t": {"N": str(last_like_time)},
":i": {"N": "1"}
},
ReturnConsumedCapacity='INDEXES'
)
if response.has_key('ConsumedCapacity'):
# write capacity units - table
t_wcu = response['ConsumedCapacity']['Table']['CapacityUnits']
# write capacity units - index
i_wcu = response['ConsumedCapacity']['LocalSecondaryIndexes'][LSI_NAME]['CapacityUnits']
else:
# DDB local does not return consumed capacity
t_wcu = 0.0
i_wcu = 0.0
# New values of attrs. userid, imageid, last_like_userid,
# last_like_time should be the same as we specified above
new_attrs = response['Attributes']
# new_attrs is a nested dict. E.g.
# new_attrs = {"userid": {"S": "pariajt"}, "imageid": {"S": "parijat-1"}, ...}
n_userid = new_attrs['userid']['S']
n_imageid = new_attrs['imageid']['S']
n_last_like_userid = new_attrs['last_like_userid']['S']
n_last_like_time = float(new_attrs['last_like_time']['N'])
n_last_like_time_str = time.ctime(n_last_like_time)
n_total_likes = new_attrs['total_likes']['N']
print "%10s | %15s | %15s | %24s | %10s | %4.0f | %4.0f" % \
(n_userid, n_imageid, n_last_like_userid, n_last_like_time_str, n_total_likes, t_wcu, i_wcu)
def query_LatestLikes():
# for each user, query last Y liked images
for userid in userids:
print
print "Querying latest [{}] liked images for user [{}]".format(Y, userid)
response = client.query(
TableName=TABLE_NAME,
IndexName=LSI_NAME,
Select='ALL_PROJECTED_ATTRIBUTES',
ConsistentRead=True,
ReturnConsumedCapacity='INDEXES',
ScanIndexForward=False, # return results in descending order of sort key
Limit=Y,
KeyConditionExpression='userid = :userid',
ExpressionAttributeValues={":userid": {"S": userid}}
)
if response.has_key('ConsumedCapacity'):
t_rcu = response['ConsumedCapacity']['Table']['CapacityUnits']
i_rcu = response['ConsumedCapacity']['LocalSecondaryIndexes'][LSI_NAME]['CapacityUnits']
else:
t_rcu = 0.0
i_rcu = 0.0
print "Query consumed [{}] RCUs on table, [{}] RCUs on Index.".format(t_rcu, i_rcu)
print
print "%15s | %15s | %24s | %10s" % ('ImageId', 'LastLikeUserId', 'LastLikeTime', 'TotalLikes')
for item in response['Items']:
imageid = item['imageid']['S']
last_like_userid = item['last_like_userid']['S']
last_like_time = float(item['last_like_time']['N'])
last_like_time_str = time.ctime(last_like_time)
total_likes = item['total_likes']['N']
print "%15s | %15s | %24s | %10s" % (imageid, last_like_userid, last_like_time_str, total_likes)
if __name__ == "__main__":
create_table()
populate_UserImageInfo()
query_LatestLikes()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment