Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Python script to efficiently concatenate S3 files
'''
This script performs efficient concatenation of files stored in S3. Given a
folder, output location, and optional suffix, all files with the given suffix
will be concatenated into one file stored in the output location.
Concatenation is performed within S3 when possible, falling back to local
operations when necessary.
Run `python combineS3Files.py -h` for more info.
'''
import boto3
import os
import threading
import argparse
import logging
# Script expects everything to happen in one bucket
BUCKET = "" # set by command line args
# S3 multi-part upload parts must be larger than 5mb
MIN_S3_SIZE = 6000000
# Setup logger to display timestamp
logging.basicConfig(format='%(asctime)s => %(message)s')
def run_concatenation(folder_to_concatenate, result_filepath, file_suffix, max_filesize):
s3 = new_s3_client()
parts_list = collect_parts(s3, folder_to_concatenate, file_suffix)
logging.warning("Found {} parts to concatenate in {}/{}".format(len(parts_list), BUCKET, folder_to_concatenate))
grouped_parts_list = chunk_by_size(parts_list, max_filesize)
logging.warning("Created {} concatenation groups".format(len(grouped_parts_list)))
for i, parts in enumerate(grouped_parts_list):
logging.warning("Concatenating group {}/{}".format(i, len(grouped_parts_list)))
run_single_concatenation(s3, parts, "{}-{}".format(result_filepath, i))
def run_single_concatenation(s3, parts_list, result_filepath):
if len(parts_list) > 1:
# perform multi-part upload
upload_id = initiate_concatenation(s3, result_filepath)
parts_mapping = assemble_parts_to_concatenate(s3, result_filepath, upload_id, parts_list)
complete_concatenation(s3, result_filepath, upload_id, parts_mapping)
elif len(parts_list) == 1:
# can perform a simple S3 copy since there is just a single file
resp = s3.copy_object(Bucket=BUCKET, CopySource="{}/{}".format(BUCKET, parts_list[0][0]), Key=result_filepath)
logging.warning("Copied single file to {} and got response {}".format(result_filepath, resp))
else:
logging.warning("No files to concatenate for {}".format(result_filepath))
pass
def chunk_by_size(parts_list, max_filesize):
grouped_list = []
current_list = []
current_size = 0
for p in parts_list:
current_size += p[1]
current_list.append(p)
if current_size > max_filesize:
grouped_list.append(current_list)
current_list = []
current_size = 0
return grouped_list
def new_s3_client():
# initialize an S3 client with a private session so that multithreading
# doesn't cause issues with the client's internal state
session = boto3.session.Session()
return session.client('s3')
def collect_parts(s3, folder, suffix):
return filter(lambda x: x[0].endswith(suffix), _list_all_objects_with_size(s3, folder))
def _list_all_objects_with_size(s3, folder):
def resp_to_filelist(resp):
return [(x['Key'], x['Size']) for x in resp['Contents']]
objects_list = []
resp = s3.list_objects(Bucket=BUCKET, Prefix=folder)
objects_list.extend(resp_to_filelist(resp))
while resp['IsTruncated']:
# if there are more entries than can be returned in one request, the key
# of the last entry returned acts as a pagination value for the next request
logging.warning("Found {} objects so far".format(len(objects_list)))
last_key = objects_list[-1][0]
resp = s3.list_objects(Bucket=BUCKET, Prefix=folder, Marker=last_key)
objects_list.extend(resp_to_filelist(resp))
return objects_list
def initiate_concatenation(s3, result_filename):
# performing the concatenation in S3 requires creating a multi-part upload
# and then referencing the S3 files we wish to concatenate as "parts" of that upload
resp = s3.create_multipart_upload(Bucket=BUCKET, Key=result_filename)
logging.warning("Initiated concatenation attempt for {}, and got response: {}".format(result_filename, resp))
return resp['UploadId']
def assemble_parts_to_concatenate(s3, result_filename, upload_id, parts_list):
parts_mapping = []
part_num = 0
s3_parts = ["{}/{}".format(BUCKET, p[0]) for p in parts_list if p[1] > MIN_S3_SIZE]
local_parts = [p[0] for p in parts_list if p[1] <= MIN_S3_SIZE]
# assemble parts large enough for direct S3 copy
for part_num, source_part in enumerate(s3_parts, 1): # part numbers are 1 indexed
resp = s3.upload_part_copy(Bucket=BUCKET,
Key=result_filename,
PartNumber=part_num,
UploadId=upload_id,
CopySource=source_part)
logging.warning("Setup S3 part #{}, with path: {}, and got response: {}".format(part_num, source_part, resp))
parts_mapping.append({'ETag': resp['CopyPartResult']['ETag'][1:-1], 'PartNumber': part_num})
# assemble parts too small for direct S3 copy by downloading them locally,
# combining them, and then reuploading them as the last part of the
# multi-part upload (which is not constrained to the 5mb limit)
small_parts = []
for source_part in local_parts:
temp_filename = "/tmp/{}".format(source_part.replace("/","_"))
s3.download_file(Bucket=BUCKET, Key=source_part, Filename=temp_filename)
with open(temp_filename, 'rb') as f:
small_parts.append(f.read())
os.remove(temp_filename)
logging.warning("Downloaded and copied small part with path: {}".format(source_part))
if len(small_parts) > 0:
last_part_num = part_num + 1
last_part = ''.join(small_parts)
resp = s3.upload_part(Bucket=BUCKET, Key=result_filename, PartNumber=last_part_num, UploadId=upload_id, Body=last_part)
logging.warning("Setup local part #{} from {} small files, and got response: {}".format(last_part_num, len(small_parts), resp))
parts_mapping.append({'ETag': resp['ETag'][1:-1], 'PartNumber': last_part_num})
return parts_mapping
def complete_concatenation(s3, result_filename, upload_id, parts_mapping):
if len(parts_mapping) == 0:
resp = s3.abort_multipart_upload(Bucket=BUCKET, Key=result_filename, UploadId=upload_id)
logging.warning("Aborted concatenation for file {}, with upload id #{} due to empty parts mapping".format(result_filename, upload_id))
else:
resp = s3.complete_multipart_upload(Bucket=BUCKET, Key=result_filename, UploadId=upload_id, MultipartUpload={'Parts': parts_mapping})
logging.warning("Finished concatenation for file {}, with upload id #{}, and parts mapping: {}".format(result_filename, upload_id, parts_mapping))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="S3 file combiner")
parser.add_argument("--bucket", help="base bucket to use")
parser.add_argument("--folder", help="folder whose contents should be combined")
parser.add_argument("--output", help="output location for resulting merged files, relative to the specified base bucket")
parser.add_argument("--suffix", help="suffix of files to include in the combination")
parser.add_argument("--filesize", type=int, help="max filesize of the concatenated files in bytes")
args = parser.parse_args()
logging.warning("Combining files in {}/{} to {}/{}, with a max size of {} bytes".format(BUCKET, args.folder, BUCKET, args.output, args.filesize))
BUCKET = args.bucket
run_concatenation(args.folder, args.output, args.suffix, args.filesize)
from datetime import datetime
import combineS3Files as combiner
from multiprocessing import Pool
def folders_at_level(base_folder, level):
# This function returns all of the S3 folders at a given depth level
# underneath the base folder.
# This allows a list to be built up of folders that should have their
# contents concatenated.
# Example: level --> 1 2 3
# | | |
# base_folder/a/b/c
s3 = combiner.new_s3_client()
return {'/'.join(k[0].split('/')[:level+1]) for k in combiner.collect_parts(s3, base_folder, "")}
def rollup(input_folder):
# parallel map function only takes 1 arg
combiner.run_concatenation(input_folder, "pythonrollup/{}/all.json".format(input_folder), "")
if __name__ == '__main__':
print("Start: {}".format(datetime.now()))
folders_to_rollup = folders_at_level("split-2015-07-06", 3)
print("Folders: {}".format(folders_to_rollup))
print("Finished Getting S3 Listing: {}".format(datetime.now()))
pool = Pool(processes=128)
pool.map(rollup, folders_to_rollup, 200)
print("End: {}".format(datetime.now()))
@SasikalaRaju

This comment has been minimized.

Copy link

@SasikalaRaju SasikalaRaju commented Sep 24, 2016

Hello,
does it work if file under given folder exceeds 10,000 ? Since multi part upload have limitation of 10,000 parts.

@SasikalaRaju

This comment has been minimized.

Copy link

@SasikalaRaju SasikalaRaju commented Sep 24, 2016

Hello,

does it work if file under given folder exceeds 10,000 ? Since multi part upload have limitation of 10,000 parts.

@ThomasGro

This comment has been minimized.

Copy link

@ThomasGro ThomasGro commented Feb 1, 2017

Hi Jason,
I am not sure I use the filesize flag correctly. I have 2 WGS files of >9GB that I want to concatenate:
2017-01-29 12:58 9262904819 s3://mybucket/myfolder/HCT116-Day0A/FCHF2KYALXX_L5_HUMmkrEAACWABA-375_1.fq.gz
2017-01-29 13:32 9241600244 s3://mybucket/myfolder/HCT116-Day0A/FCHF2KYALXX_L6_HUMmkrEAACWABA-375_1.fq.gz

When I set the filesize to 1GB I get the following error:

python combineS3Files.py --bucket 'mybucket' --folder 'myfolder' --suffix '_1.fq.gz' --output 'newfolder/HCT116-Day0A_1.fq.gz' --filesize 1000000000

2017-02-01 15:13:24,708 => Combining files in mybucket to newfolder/HCT116-Day0A_1.fq.gz, with a max size of 1000000000 bytes
2017-02-01 15:13:25,931 => Found 2 parts to concatenate in myfolder/HCT116-Day0A/
2017-02-01 15:13:25,931 => Created 2 concatenation groups
2017-02-01 15:13:25,931 => Concatenating group 0/2
Traceback (most recent call last):
File "combineS3Files.py", line 158, in
run_concatenation(args.folder, args.output, args.suffix, args.filesize)
File "combineS3Files.py", line 33, in run_concatenation
run_single_concatenation(s3, parts, "{}-{}".format(result_filepath, i))
File "combineS3Files.py", line 44, in run_single_concatenation
resp = s3.copy_object(Bucket=BUCKET, CopySource="{}/{}".format(BUCKET, parts_list[0][0]), Key=result_filepath)
File "/home/m03146/.local/lib/python2.7/site-packages/botocore/client.py", line 251, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/home/m03146/.local/lib/python2.7/site-packages/botocore/client.py", line 537, in _make_api_call
raise ClientError(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (InvalidRequest) when calling the CopyObject operation: The specified copy source is larger than the maximum allowable size for a copy source: 5368709120

Hope you can help.
Thomas

@scotthankinson

This comment has been minimized.

Copy link

@scotthankinson scotthankinson commented Feb 1, 2017

@ThomasGro
This is not an issue with the sample above, but with boto itself. S3 has a max files size of ~5GB for copy operations. You'll have to make your source files smaller to assemble them.

http://stackoverflow.com/questions/10355941/how-can-i-copy-files-bigger-than-5-gb-in-amazon-s3

@sbailliez

This comment has been minimized.

Copy link

@sbailliez sbailliez commented Aug 21, 2017

The chunk_by_size has a small bug I think. It will be missing the data that does not yet form a full part, so for instance, if there is only 5MB accumulated out of a max of 10MB, it will return an empty grouped_list whereas it should return it. I suppose it depends on the use case one has, though.

# add leftover part that does not form a complete max_filesize block
if current_size > 0:
    grouped_list.append(current_list)
    
return grouped_list
@jaco8

This comment has been minimized.

Copy link

@jaco8 jaco8 commented Aug 23, 2017

@sbailliez Great point!

@Manjanjanadri

This comment has been minimized.

Copy link

@Manjanjanadri Manjanjanadri commented May 9, 2018

Hello friends any one know about AWS download file from s3 file using python script please ping me 7338320090

@w73919612

This comment has been minimized.

Copy link

@w73919612 w73919612 commented Jul 8, 2018

I am new to this and I have really tried to get this working. I have 261 95MB files that i uploaded with a script to my S3 bucket. Now I need to to combine them back into 1 single file.

If I put a filesize of less than the 25GB single file size, the script works but I get several files instead of 1.

If I run the following command, which sets the max file size of the output file big enough to include all the parts, it doesn't do anything.

I think I may be missing the point of this code...

python combineOnS3.py --bucket vanillalv83vmwithfnbongpharma --folder splitfiles --output LV_Local_Demo_.vmdk --filesize 300000000000

@w73919612

This comment has been minimized.

Copy link

@w73919612 w73919612 commented Jul 8, 2018

I would like some answers on my above comment, but my friend just told me of the new aws cli command, and it uploaded my 23 GB file like a charm no problems...

aws s3 cp ./<file-to-upload.extension> s3://<bucket_name>/<filename-to-save-uploadedfile-as.extension

just ran this in a git bash terminal window on my windows machine. :)

@xtream1101

This comment has been minimized.

Copy link

@xtream1101 xtream1101 commented Mar 2, 2019

I created a python lib and cli tool that does this based around the code in this gist. It can be found here https://github.com/xtream1101/s3-concat

@fzf058

This comment has been minimized.

Copy link

@fzf058 fzf058 commented Jun 25, 2019

Do you have an example where the s3 bucket name and folder or path are filled in? I'm not clear on what rows where that information needs to be manually typed in to the code.

@sagarsitap596

This comment has been minimized.

Copy link

@sagarsitap596 sagarsitap596 commented Oct 22, 2019

Thanks great utility.

Your file size is exceeding max_filesize.

check this -

def chunk_by_size(parts_list, max_filesize):
    grouped_list = []
    current_list = []
    current_size = 0
    for p in parts_list:
       if current_size + p[1]  > max_filesize:
            grouped_list.append(current_list)
            current_list = []
            current_size = 0

        current_size += p[1]
        current_list.append(p)

    return grouped_list
@harinathselvaraj

This comment has been minimized.

Copy link

@harinathselvaraj harinathselvaraj commented Jan 10, 2020

I created a python lib and cli tool that does this based around the code in this gist. It can be found here https://github.com/xtream1101/s3-concat

I used this and it works perfectly. Thank you so much @xtream1101 :)

@vjkholiya123

This comment has been minimized.

Copy link

@vjkholiya123 vjkholiya123 commented Feb 17, 2020

I have WAV files stored in S3 bucket which I created from Media Stream recording through React JS. I got the blob of the recording, then converted that blob to base64 string and from that string I created a buffer and then converted that buffer to a WAV file and stored in S3. Now I want to concatenate all those stored WAV files inside my lambda function using NodeJS. Since there is a 500MB limitations in Lambda function so I don't want to store in /tmp and directly upload the concatenated file into S3 bucket. Can anyone help in this? I have tried to concatenate buffer array which I received for every WAV file fetched from S3 but the audio is only coming from 1st audio i.e if I am concatenating 4 audio files only the first audio sound is played.

@xtream1101

This comment has been minimized.

Copy link

@xtream1101 xtream1101 commented Feb 18, 2020

@vjkholiya123, This gist as well as my s3-concat python just takes the bytes of one file and append it to another. This type of concatenation only works for certain files.

The reason you are only hearing the first audio file is that most files have a start and an end to them. So in your case once the first audio file is done playing, it sees the ending bytes and thinks its done (no more audio).

To combine multiple audio files together you will have to use some other tool like ffmpeg or similar to convert and merge them correctly.

Not sure if you are looking to create one large single playable audio file or just trying to condense data, if the later then I am also working on a python library/cli tool called s3-tar which can tar or tar.gz many files into an archive.

@vjkholiya123

This comment has been minimized.

Copy link

@vjkholiya123 vjkholiya123 commented Feb 18, 2020

@xtream1101, Thanks for your response. I am trying the create a single WAV file from multiple WAV files. I am able to use SOX in lambda function but the problem in lambda function is the 500MB /tmp storage and my WAV files are way larger than it. By using SOX I am able to concat the WAV files by first downloading the individual WAV files from bucket and storing them into /tmp storage and then running SOX command over those WAV files and storing the output in /tmp storage only and then uploading it to S3.
But I don't want to store in /tmp storage as it is very less for me so I tried concatenating the buffer data got while downloading the WAV files.

@xtream1101

This comment has been minimized.

Copy link

@xtream1101 xtream1101 commented Feb 18, 2020

There are 2 solutions for this that I see,

  1. Do not write the files to disk, and keep in memory since lambdas have a max of 3GB
  2. If 3 GB is still not enough space, then I would suggest you move this process to a fargate task which has much larger disk/memory options.
@jacquilevitan

This comment has been minimized.

Copy link

@jacquilevitan jacquilevitan commented Jul 14, 2020

does this work for MP4 files??

@Jatinder-Luthra

This comment has been minimized.

Copy link

@Jatinder-Luthra Jatinder-Luthra commented Jul 19, 2020

I am using python 3.7 to concatenate the files. Getting below error with this part of code:
len(filter(lambda x: x[0].endswith(suffix), _list_all_objects_with_size(s3, folder)))

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: object of type 'filter' has no len()

Any help/thoughts please?

@HugoGeeves

This comment has been minimized.

Copy link

@HugoGeeves HugoGeeves commented Sep 21, 2020

Really really like this, helped me out a lot, have been looking into using multipart upload to do the heavy lifting and this does it perfectly 👍

@HugoGeeves

This comment has been minimized.

Copy link

@HugoGeeves HugoGeeves commented Sep 21, 2020

I am using python 3.7 to concatenate the files. Getting below error with this part of code:
len(filter(lambda x: x[0].endswith(suffix), _list_all_objects_with_size(s3, folder)))

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: object of type 'filter' has no len()

Any help/thoughts please?

I think filter is a generator, so doesn't actually have a length. You can cast it as a list where the parts list filter is defined to solve this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.