Skip to content

Instantly share code, notes, and snippets.

@bwicklund
Last active March 12, 2023 08:27
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save bwicklund/0000c9066845afc928e128f2ff79cba1 to your computer and use it in GitHub Desktop.
Save bwicklund/0000c9066845afc928e128f2ff79cba1 to your computer and use it in GitHub Desktop.
S3 file Concatenation/Combination. S3 Spark file merge.
import argparse
import boto3
import os
import threading
from fnmatch import fnmatch
# S3 multi-part upload parts must be larger than 5mb
MIN_S3_SIZE = 6000000
LOG_LEVEL = 'INFO'
def concat(bucket, key, result_key, pattern):
s3_client = boto3.session.Session().client('s3')
objects_list = [x for x in list_all_objects(
s3_client, bucket, result_key, key) if fnmatch(x[0], pattern)]
print(
f"Found {len(objects_list)} parts to concatenate in s3://{bucket}/{key}")
for object in objects_list:
print(f"Found: {object[0]} - {round(object[1]/1000, 2)}k")
run_concatenation(s3_client, bucket, key, result_key, objects_list)
def list_all_objects(s3_client, bucket, result_key, key):
def format_return(resp):
return [(x['Key'], x['Size']) for x in resp['Contents']]
objects = []
resp = s3_client.list_objects(Bucket=bucket, Prefix=key)
objects.extend(format_return(resp))
while resp['IsTruncated']:
# If there are more objects than can be returned in a signle request
# then the key of the last item is used for pagination.
last_key = objects[-1][0]
resp = s3_client.list_objects(
Bucket=bucket, Prefix=key, Marker=last_key)
objects.extend(format_return(resp))
return objects
def run_concatenation(s3_client, bucket, key, result_key, objects_list):
if len(objects_list) > 1:
upload_id = s3_client.create_multipart_upload(
Bucket=bucket, Key=result_key)['UploadId']
parts_mapping = assemble_parts_to_concatenate(
s3_client, bucket, key, result_key, upload_id, objects_list)
if len(parts_mapping) == 0:
resp = s3_client.abort_multipart_upload(
Bucket=bucket, Key=result_key, UploadId=upload_id)
print(
f"Aborted concatenation for file {result_filename}, parts list empty!")
else:
resp = s3_client.complete_multipart_upload(
Bucket=bucket, Key=result_key, UploadId=upload_id, MultipartUpload={'Parts': parts_mapping})
print(
f"Finished concatenation for file {result_key} response was: {resp}")
elif len(objects_list) == 1:
# can perform a simple S3 copy since there is just a single file
resp = s3_client.copy_object(
Bucket=bucket, CopySource=f"{bucket}/{objects_list[0][0]}", Key=result_key)
print(f"Copied single file to {result_key} response was: {resp}")
else:
print(f"No files to concatenate for {result_filepath}")
def assemble_parts_to_concatenate(s3_client, bucket, key, result_key, upload_id, objects_list):
parts_mapping = []
part_num = 0
s3_objects = ["{}/{}".format(bucket, p[0])
for p in objects_list if p[1] > MIN_S3_SIZE]
local_objects = [p[0] for p in objects_list if p[1]
<= MIN_S3_SIZE and not p[0] == f"{key}/"]
total = len(s3_objects) + len(local_objects)
# assemble parts large enough for direct S3 copy
# part numbers are 1 indexed
for part_num, source_object in enumerate(s3_objects, 1):
resp = s3_client.upload_part_copy(Bucket=bucket,
Key=result_key,
PartNumber=part_num,
UploadId=upload_id,
CopySource=source_object)
print(f"@@@ Uploaded S3 object #{part_num} of {total}")
parts_mapping.append(
{'ETag': resp['CopyPartResult']['ETag'][1:-1], 'PartNumber': part_num})
# Download the objects to small for direct s3 copy
# combine them, and then uploading them as the last part of the
# multi-part upload (no 5mb limit)
small_objects = []
for source_object in local_objects:
# Remove forward slash
temp_filename = "/tmp/{}".format(source_object.replace("/", "_"))
s3_client.download_file(
Bucket=bucket, Key=source_object, Filename=temp_filename)
with open(temp_filename, 'rb') as f:
small_objects.append(f.read())
os.remove(temp_filename)
print(f"@@@ Downloaded S3 Object: {source_object}")
if len(small_objects) > 0:
last_part_num = part_num + 1
last_object = b''.join(small_objects)
resp = s3_client.upload_part(
Bucket=bucket, Key=result_key, PartNumber=last_part_num, UploadId=upload_id, Body=last_object)
print(f"@@@ Uploaded S3 object #{last_part_num} of {total}")
parts_mapping.append(
{'ETag': resp['ETag'][1:-1], 'PartNumber': last_part_num})
return parts_mapping
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="S3 Concatenation Utility.")
parser.add_argument("--bucket", help="S3 Bucket.")
parser.add_argument(
"--key", help="Key/Folder Whose Contents Should Be Combined.")
parser.add_argument(
"--result_key", help="Output of Concatenation, Relative To The Specified Bucket.")
parser.add_argument("--pattern", default='*',
help="Pattern To Match The File Names Against For Adding To The Combination.")
args = parser.parse_args()
print("Combining files in s3://{}/{} to s3://{}/{} matching pattern {}".format(
args.bucket, args.key, args.bucket, args.result_key, args.pattern))
concat(args.bucket, args.key, args.result_key, args.pattern)
@bwicklund
Copy link
Author

Usage:

usage: s3_file_combine.py [-h] [--bucket BUCKET] [--key KEY]
                          [--result_key RESULT_KEY] [--pattern PATTERN]

S3 Concatenation Utility.

optional arguments:
  -h, --help            show this help message and exit
  --bucket BUCKET       S3 Bucket.
  --key KEY             Key/Folder Whose Contents Should Be Combined.
  --result_key RESULT_KEY
                        Output of Concatenation, Relative To The Specified
                        Bucket.
  --pattern PATTERN     Pattern To Match The File Names Against For Adding To
                        The Combination.

@Lydon-01
Copy link

Lydon-01 commented Mar 1, 2019

Am I doing something wrong?

$ python s3_file_combine.py -bucket lydon-bucket --key /temp/2019/02/02/07/ --result_key combined.gz --pattern "*.gz"
File "s3_file_combine.py", line 17
f"Found {len(objects_list)} parts to concatenate in s3://{bucket}/{key}")

SyntaxError: invalid syntax

@mlouie
Copy link

mlouie commented Mar 5, 2019

@Lydon-01 What version of Python are you using? It's possible the syntax error is coming from the use of formatted strings. I was getting this same error too, but then switched to 3.7.2 and it seems to work fine for me now.

@bwicklund
Copy link
Author

@Lydon-01 You still having problems?

@aarsanjani
Copy link

Great utility. However I am experiencing some issues: it seems to download ok, then when it tries to combine files, it says the following and produces no ouput:
<after multiple @@@Downloaded>
"@@@ Uploaded S3 object #1 of 52
Finished concatenation for file output/file.combine response was: {'ResponseMetadata': {'RequestId': '40469EFF169AD06C', 'HostId': 'XXX', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'jXXX=', 'x-amz-request-id': 'XXX', 'date': 'Sun, 08 Dec 2019 18:56:33 GMT', 'content-type': 'application/xml', 'transfer-encoding': 'chunked', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'Location': 'https://vrct-bucket.s3.us-west-2.amazonaws.com/output%2Ffile.combine', 'Bucket': 'vrct-bucket', 'Key': 'output/file.combine', 'ETag': '"XXX"'}

@bwicklund
Copy link
Author

bwicklund commented Dec 9, 2019 via email

@farshadniayeshpour
Copy link

Should this be used as a Lambda function or locally/EC2?

@bwicklund
Copy link
Author

bwicklund commented Jan 21, 2020 via email

@farshadniayeshpour
Copy link

Does this concatenate files less than 5mb?

@bwicklund
Copy link
Author

bwicklund commented Jan 21, 2020 via email

@farshadniayeshpour
Copy link

Thank you!

@farshadniayeshpour
Copy link

I'm sorry but all my files are on the root directory of the S3 bucket and I don't know what to put for the --key. Any ideas?

@farshadniayeshpour
Copy link

FileNotFoundError: [Errno 2] No such file or directory: '/tmp/10400_1995-01-06-output.csv.14Ba33Ee'
I am getting this error. It cannot find the temp file, can you help me on this?

@xfinity1010
Copy link

It is failing with "Access Denied". Bucket is SSE-KMS encrypted. I need to pass KMS key for upload the objects on S3. Please suggest the changes.

@crazyavi
Copy link

Hi I am using in ec2-instance. But My output is not showing all records(some records are missing).And I am using for "parquet" files.
Command:
python3 s3_file_combine.py --bucket capmlp --key new/pvaimariasource1/pvai_activity1/ --result_key new/pvaimariasource1/pvai_activity1/LOAD00000002.parquet --pattern "*.parquet"
Any ideas

@bwicklund
Copy link
Author

Sorry to be unresponsive on this. I no longer use this utility, and don't have the time or the environment to continue developing it or testing it. If anyone would be willing to take it over and work though the issues brought up here that would be awesome.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment