Skip to content

Instantly share code, notes, and snippets.

@KamilMroczek
Created April 6, 2022 06:12
Show Gist options
  • Save KamilMroczek/04646a1192d485acf41a988b40cad008 to your computer and use it in GitHub Desktop.
Save KamilMroczek/04646a1192d485acf41a988b40cad008 to your computer and use it in GitHub Desktop.
HTTP to S3
class HttpDownloader(object):
def get_to_s3(self, http_get_source_url: str, bucket_name: str, key: str, max_concurrency: int = 4):
"""
Taken from https://amalgjose.com/2020/08/13/python-program-to-stream-data-from-a-url-and-write-it-to-s3/.
:param http_get_source_url: The url to download the data from. Should be a GET request.
:param bucket_name: The target bucket name.
:param key: The target S3 key.
:param max_concurrency: The maximum number of threads to upload concurrently.
Used for Equals boto3.s3.transfer.TransferConfig.max_concurrency
"""
logging.info(f'Downloading from {http_get_source_url} to s3://{bucket_name}/{key}.')
def output_progress(bytes):
logging.info(f'Transferred {bytes} bytes')
start_time = time.time()
session = requests.Session()
http_response = session.get(url=http_get_source_url, stream=True)
http_response.raw.decode_content = True
config = TransferConfig(max_concurrency=max_concurrency)
boto3.client('s3').upload_fileobj(
Fileobj=http_response.raw,
Bucket=bucket_name,
Key=key,
Callback=output_progress,
Config=config
)
logging.info(f'Completed Successfully. Time taken {(time.time() - start_time)/60} mins.')
import boto3
import requests
import logging
BYTES_IN_MB = 2 ** 20
BYTES_IN_GB = 2 ** 30
class HttpDownloader(object):
def get_to_s3(self, http_get_source_url: str, bucket_name: str, key: str, chunk_size=BYTES_IN_MB * 10):
"""
Downloads HTTP GET URL to S3.
:param http_get_source_url: The url to download the data from. Should be a GET request.
:param bucket_name: The target bucket name.
:param key: The target S3 key.
:param chunk_size: The chunk size to uplaod to AWS with. AWS allows up to 10K chunks to be uploaded, so the
maximum file size is 10k * chunk_size. With the default you get 100 GB.
"""
logging.info(f'Downloading from {http_get_source_url} to s3://{bucket_name}/{key}.')
def output_progress(bytes):
mb = round(bytes/BYTES_IN_MB, 0)
if mb > 1000:
gb = round(bytes / BYTES_IN_GB, 1)
logging.info(f'Transferred {gb} GB')
else:
logging.info(f'Transferred {mb} MB')
session = requests.Session()
http_response = session.get(url=http_get_source_url, stream=True)
client = boto3.client('s3')
response = client.create_multipart_upload(Bucket=bucket_name, Key=key)
upload_id = response['UploadId']
chunk_id = 1
chunks = []
logging.info(f'Uploading to id {upload_id}')
for chunk in http_response.iter_content(chunk_size=chunk_size, decode_unicode=True):
part_response = client.upload_part(
Body=chunk,
Bucket=bucket_name,
Key=key,
PartNumber=chunk_id,
UploadId=upload_id,
)
chunks.append({
'ETag': part_response['ETag'],
'PartNumber': chunk_id
})
output_progress(bytes=chunk_size * chunk_id)
chunk_id += 1
response = client.complete_multipart_upload(
Bucket=bucket_name,
Key=key,
MultipartUpload={'Parts': chunks},
UploadId=upload_id
)
logging.info(f'Completed Successfully.')
logging.info(response)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment