Skip to content

Instantly share code, notes, and snippets.

@alicefuzier
Last active April 10, 2018 09:32
Show Gist options
  • Save alicefuzier/61946cc6ebbf4b273b70c589ede56a2e to your computer and use it in GitHub Desktop.
Save alicefuzier/61946cc6ebbf4b273b70c589ede56a2e to your computer and use it in GitHub Desktop.
import boto3
from multiprocessing import Pool
from tenacity import retry
from tenacity.wait import wait_random
from tqdm import tqdm
@retry(wait=wait_random(min=1, max=300))
def update_data(table_name, id):
try:
dynamodb_client = boto3.client('dynamodb')
key = {'id': id}
item = dynamodb_client.get_item(TableName=table_name,Key=key)['Item']
new_version = int(item['version']['N']) + 1
dynamodb_client.update_item(
TableName=table_name,
Key=key,
UpdateExpression='SET version = :newVersion, reindexVersion=:reindexVersion',
ConditionExpression='version < :newVersion',
ExpressionAttributeValues={
':newVersion': {'N': str(new_version)},
':reindexVersion': {'N': str(0)},
}
)
except Exception as e:
raise e
@retry(wait=wait_random(min=1, max=300))
def paginate(paginator, table_name, filtering_exp):
try:
return paginator.paginate(TableName=table_name, FilterExpression=filtering_exp)
except Exception as e:
raise e
def add_reindex_version_all_items(table_name):
dynamodb_client = boto3.client('dynamodb')
paginator = dynamodb_client.get_paginator('scan')
filtering_exp = "attribute_not_exists(reindexVersion)"
for page in paginate(paginator, table_name, filtering_exp):
with Pool(10) as pool:
processes = [pool.apply_async(update_data,[table_name, item['id']]) for item in page['Items']]
for process in tqdm(processes):
try:
process.get(timeout=3600)
except TimeoutError as e:
print(f'Updating {id} timed out!')
raise e
print("Done!")
add_reindex_version_all_items("SourceData")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment