Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save banderlog/cd14886421cf1cfcbdca7e8aab9dc584 to your computer and use it in GitHub Desktop.
Save banderlog/cd14886421cf1cfcbdca7e8aab9dc584 to your computer and use it in GitHub Desktop.
Download dataset from AWS dataexchange via signedurls
#!/usr/bin/env python
"""
Download dataset from AWS dataexchange via signedurls
Docs on the matter:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dataexchange.html#DataExchange.Client.create_job
For via bucket upproach look here:
https://github.com/aws-samples/aws-dataexchange-api-samples/blob/master/subscribers/python/download-entitled-assets/download-entitled-assets.py
"""
import boto3
import time
import urllib.request
from pathlib import Path
import click
def get_dataset_via_signedurls(dx, data_set_id: str, out_dir: str,
revision_id=None, rewrite=False):
""" Will dowload all assets for selected dataset using SIGNED_URL
:param dx: boto3.client('dataexchange', region_name='us-east-1')
:param data_set_id: ID of selected dataset
:param out_dir: where to put your files.
it will create 'dataset_name/revision_id' subdirs
:param revision_id: it will download lates revision if ``None``
:param rewrite: will not download and rewrite already downloaded asset if ``False``
"""
# get dataset name
res_d = dx.get_data_set(DataSetId=data_set_id)
dataset_name = res_d.get('Name')
# get dataset revision
if revision_id is None:
res_r = dx.list_data_set_revisions(DataSetId=data_set_id)
revision_id = res_r.get('Revisions')[0].get('Id')
# get assets
res_a = dx.list_revision_assets(DataSetId=data_set_id,
RevisionId=revision_id)
assets = res_a.get('Assets')
# create dir tree
destination = Path(out_dir, dataset_name, revision_id)
for asset in assets:
asset_path = Path(asset.get('Name')).parent
path = Path(destination, asset_path)
path.mkdir(parents=True, exist_ok=True)
# download
for asset in assets:
asset_destination = Path(destination, asset.get('Name'))
# check if already downloaded
if (not rewrite) and asset_destination.is_file():
print(f'Skipping "{asset_destination}"')
continue
# create job to get url, url valid for 60sec
print(f'Downloading file "{asset_destination}"')
job = dx.create_job(Type='EXPORT_ASSET_TO_SIGNED_URL',
Details={
'ExportAssetToSignedUrl': {
"AssetId": asset.get('Id'),
"DataSetId": asset.get('DataSetId'),
"RevisionId": asset.get('RevisionId')
}
})
job_id = job.get('Id')
dx.start_job(JobId=job_id)
# check job state
while True:
# wait
time.sleep(1)
job = dx.get_job(JobId=job_id)
# download by signed_url
if job.get('State') == 'COMPLETED':
url = job.get('Details').get('ExportAssetToSignedUrl').get('SignedUrl')
urllib.request.urlretrieve(url, asset_destination)
print('COMPLETED')
break
# failure, just restart with ``rewrite=False``
elif job.get('State') == 'ERROR':
message = job.get('Errors')[0].get('Message')
raise Exception(f"Job {job_id} failed to complete - {message}")
else:
print('WAITING')
@click.command()
@click.option('--data_set_id', help='ID of selected dataset', required=True, type=str)
@click.option('--destination_dir', default='./', type=str, show_default=True,
help="Where to download your dataset",)
@click.option('--revision_id', default=None, type=str, show_default=True,
help="Desired revision of dataset. Latest for default.")
@click.option('--rewrite', is_flag=True, help="By default, it will not download already downloaded assets")
def main(data_set_id, destination_dir, revision_id, rewrite):
dx = boto3.client('dataexchange', region_name='us-east-1')
get_dataset_via_signedurls(dx, data_set_id, destination_dir,
revision_id=revision_id, rewrite=rewrite)
print("------- DONE -------")
return 0
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment