Last active
April 10, 2018 09:32
-
-
Save alicefuzier/61946cc6ebbf4b273b70c589ede56a2e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
from multiprocessing import Pool | |
from tenacity import retry | |
from tenacity.wait import wait_random | |
from tqdm import tqdm | |
@retry(wait=wait_random(min=1, max=300)) | |
def update_data(table_name, id): | |
try: | |
dynamodb_client = boto3.client('dynamodb') | |
key = {'id': id} | |
item = dynamodb_client.get_item(TableName=table_name,Key=key)['Item'] | |
new_version = int(item['version']['N']) + 1 | |
dynamodb_client.update_item( | |
TableName=table_name, | |
Key=key, | |
UpdateExpression='SET version = :newVersion, reindexVersion=:reindexVersion', | |
ConditionExpression='version < :newVersion', | |
ExpressionAttributeValues={ | |
':newVersion': {'N': str(new_version)}, | |
':reindexVersion': {'N': str(0)}, | |
} | |
) | |
except Exception as e: | |
raise e | |
@retry(wait=wait_random(min=1, max=300)) | |
def paginate(paginator, table_name, filtering_exp): | |
try: | |
return paginator.paginate(TableName=table_name, FilterExpression=filtering_exp) | |
except Exception as e: | |
raise e | |
def add_reindex_version_all_items(table_name): | |
dynamodb_client = boto3.client('dynamodb') | |
paginator = dynamodb_client.get_paginator('scan') | |
filtering_exp = "attribute_not_exists(reindexVersion)" | |
for page in paginate(paginator, table_name, filtering_exp): | |
with Pool(10) as pool: | |
processes = [pool.apply_async(update_data,[table_name, item['id']]) for item in page['Items']] | |
for process in tqdm(processes): | |
try: | |
process.get(timeout=3600) | |
except TimeoutError as e: | |
print(f'Updating {id} timed out!') | |
raise e | |
print("Done!") | |
add_reindex_version_all_items("SourceData") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment