Last active
July 26, 2023 15:02
-
-
Save yongjiajie/8278193e7dbf6982453f692eaf67954d to your computer and use it in GitHub Desktop.
AWS Multipart Upload
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import type { CompletedPart } from "@aws-sdk/client-s3"; | |
import type { | |
BucketName, | |
ContentEncoding, | |
ContentLength, | |
ContentType, | |
ETag, | |
MultipartUploadId, | |
ObjectKey, | |
} from "@utils/s3"; | |
import { abortMultipartUpload, completeMultipartUpload, createMultipartUpload, getGetObjectSignedUrl } from "@utils/s3"; | |
import { partition } from "lodash"; | |
import { | |
calculateUploadPartCopyRequests, | |
getPartMetadata, | |
lambdaMemorySizeMiB, | |
minPartSizeMiB, | |
groupPartsByLength, | |
uploadBigParts, | |
uploadSmallParts, | |
getKeyName, | |
} from "./utils"; | |
type MergeReportsRequest = { | |
bucket: BucketName; | |
keys: ObjectKey[]; | |
}; | |
type MergeReportsResponse = { | |
signedUrl?: string; | |
}; | |
export type Part = { | |
bucket: BucketName; | |
key: ObjectKey; | |
uploadId: MultipartUploadId; | |
partKey: ObjectKey; | |
contentLength: ContentLength; | |
contentType?: ContentType; | |
contentEncoding?: ContentEncoding; | |
eTag?: ETag; | |
start?: number; | |
end?: number; | |
}; | |
/** | |
* Merges a list of S3 objects by appending their contents into a single S3 object | |
* through multipart uploads. | |
* | |
* Take note of [multipart upload limits](https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html). | |
* | |
* Content length of multipart upload parts should be between 5 MiB and 5 GiB, except the last part. | |
* There is a maximum of 10000 parts. | |
* | |
* @param event Event from Step Function containing a list of S3 object paths. | |
* @param context AWS Lambda context object. | |
*/ | |
const mergeReports = async (event: MergeReportsRequest): Promise<MergeReportsResponse> => { | |
const { bucket, keys } = event; | |
let signedUrl: string | undefined; | |
let partNumber = 1; | |
const completedParts: CompletedPart[] = []; | |
const targetKey = getKeyName(); | |
const { UploadId: uploadId } = await createMultipartUpload({ | |
Bucket: bucket, | |
Key: targetKey, | |
}); | |
if (uploadId) { | |
const partLengths = await getPartMetadata(bucket, targetKey, uploadId, keys); | |
// Multipart upload parts have a size limit of [5 MiB, 5 GiB]. | |
// We have to handle small and big parts differently. | |
const [smallParts, bigParts] = partition(partLengths, (part) => part.contentLength < minPartSizeMiB); | |
if (bigParts.length > 0) { | |
// Upload big parts with `UploadPartCopy` | |
const uploadPartCopyRequests = calculateUploadPartCopyRequests(bigParts, partNumber); | |
try { | |
const completedBigParts = await uploadBigParts(uploadPartCopyRequests, partNumber); | |
completedParts.push(...completedBigParts); | |
partNumber += completedBigParts.length; | |
} catch (e) { | |
throw e; | |
} | |
} | |
if (smallParts.length > 0) { | |
/** | |
* Parts have a minimum size limit of 5 MiB. | |
* | |
* To fulfil this, we retrieve small parts in partitions below the default lambda | |
* memory limit since we have to hold their contents in memory. | |
* | |
* The contents of these partitions are merged and uploaded as a part using `UploadPart`. | |
*/ | |
const smallPartGroups = groupPartsByLength(smallParts, lambdaMemorySizeMiB); | |
try { | |
/** | |
* Using for...of instead of Promise.all since we want these | |
* requests to be fulfiled sequentially since they may each take up | |
* all the memory in the Lambda. | |
*/ | |
for (const parts of smallPartGroups) { | |
const completedSmallParts = await uploadSmallParts(parts, partNumber); | |
completedParts.push(...completedSmallParts); | |
partNumber += completedSmallParts.length; | |
} | |
} catch (e) { | |
throw e; | |
} | |
} | |
// Maximum number of parts per upload is 10000 | |
if (completedParts.length > 10000) { | |
await abortMultipartUpload({ Bucket: bucket, Key: targetKey, UploadId: uploadId }); | |
throw Error("Too many multipart parts"); | |
} | |
try { | |
await completeMultipartUpload({ | |
Bucket: bucket, | |
Key: targetKey, | |
UploadId: uploadId, | |
MultipartUpload: { Parts: completedParts }, | |
}); | |
} catch (e) { | |
throw e; | |
} | |
try { | |
signedUrl = await getGetObjectSignedUrl({ Bucket: bucket, Key: targetKey }); | |
} catch (e) { | |
throw e; | |
} | |
} | |
return { | |
signedUrl, | |
}; | |
}; | |
export const main = mergeReports; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import type { Part } from "./multipartUpload"; | |
import type { CompletedPart, UploadPartCopyCommandInput } from "@aws-sdk/client-s3"; | |
import type { BucketName, ETag, MultipartUploadId, ObjectKey, PartNumber } from "@utils/s3"; | |
import { getObject, headObject, uploadPart, uploadPartCopy } from "@utils/s3"; | |
import KSUID from "ksuid"; | |
import { sumBy } from "lodash"; | |
export const lambdaMemorySizeMiB = 80 * 1024 * 1024; // 80 MiB; conservative memory limit for parts we hold in memory | |
export const minPartSizeMiB = 5 * 1024 * 1024; // 5 MiB | |
const maxPartSizeMiB = 5 * 1024 * 1024 * 1024; // 5 GiB | |
/** | |
* Generates a formatted key name for an S3 object. | |
* | |
* @param prefix Prefix of S3 object. | |
* @param suffix Suffix of S3 object. | |
* @returns Formatted key of S3 object. | |
*/ | |
export function getKeyName(prefix = "merged", suffix = "csv") { | |
const now = new Date(); | |
const ksuid = KSUID.randomSync().string; | |
return `${prefix}/${now.toISOString()}-${ksuid}.${suffix}`; | |
} | |
/** | |
* Retrieves metadata of each S3 object using `HeadObject`. | |
* | |
* @param bucket S3 bucket name. | |
* @param key S3 object key to multipart upload to. | |
* @param uploadId Multipart upload ID. | |
* @param keys Array of S3 object keys of parts. | |
* @returns Promise containing an array of object metadata. | |
*/ | |
export async function getPartMetadata( | |
bucket: BucketName, | |
key: ObjectKey, | |
uploadId: MultipartUploadId, | |
keys: ObjectKey[] | |
): Promise<Part[]> { | |
const partMetadata = await Promise.all( | |
keys.map(async (partKey) => { | |
const { | |
ContentLength: contentLength, | |
ContentEncoding: contentEncoding, | |
ContentType: contentType, | |
ETag: eTag, | |
} = await headObject({ Bucket: bucket, Key: partKey }); | |
return { bucket, key, uploadId, partKey, contentLength: contentLength ?? 0, contentEncoding, contentType, eTag }; | |
}) | |
); | |
return partMetadata; | |
} | |
/** | |
* Group parts by creating sub-arrays of part metadata whose content length totals are less than the limit. | |
* | |
* @param parts Array of part metadata. | |
* @param limit Limit of each group. | |
* @returns Array with groups of parts with lengths totalling less than the lambda's default memory limit. | |
*/ | |
export function groupPartsByLength(parts: Part[], limit: number): Part[][] { | |
const groups: Part[][] = []; | |
let currentGroup = 0; | |
function joinGroup(part: Part, index: number) { | |
if (!groups[index]) { | |
groups[index] = [part]; | |
} else { | |
groups[index].push(part); | |
} | |
} | |
function getGroupSum(index: number) { | |
if (!groups[index]) { | |
groups[index] = []; | |
} | |
return sumBy(groups[index], "contentLength"); | |
} | |
function splitPart(part: Part) { | |
const subParts: Part[] = []; | |
const { contentLength } = part; | |
const count = Math.ceil(contentLength / limit); | |
for (let i = 0; i < count; i += 1) { | |
const start = i * limit; | |
let end = (i + 1) * limit - 1; | |
end = Math.min(end, contentLength - 1); | |
const subPartLength = end - start + 1; | |
subParts.push({ ...part, contentLength: subPartLength, start, end }); | |
} | |
return subParts; | |
} | |
function findAndJoinGroup(part: Part) { | |
const isPartBig = part.contentLength > limit; | |
const groupSum = getGroupSum(currentGroup); | |
const canJoinGroup = part.contentLength + groupSum < limit; | |
if (isPartBig) { | |
const subParts = splitPart(part); | |
subParts.forEach((subPart) => findAndJoinGroup(subPart)); | |
} else if (canJoinGroup) { | |
joinGroup(part, currentGroup); | |
} else { | |
currentGroup += 1; | |
joinGroup(part, currentGroup); | |
} | |
} | |
parts.forEach((part) => findAndJoinGroup(part)); | |
const nonEmptyGroups = groups.filter((group) => group.length > 0); | |
return nonEmptyGroups; | |
} | |
/** | |
* Returns an array of `UploadPartCopyCommandInput` to facilitate uploading of parts with | |
* size of at least 5 MiB using `UploadPartCopy`. | |
* | |
* @param parts Array of part metadata. | |
* @param startingPartNumber Initial part number. | |
* @returns Array of `UploadPartCopyCommandInput`. | |
*/ | |
export function calculateUploadPartCopyRequests( | |
parts: Part[], | |
startingPartNumber: number | |
): UploadPartCopyCommandInput[] { | |
// Parts must have size of [5 MiB, 5 GiB] | |
const minimumSizedParts = parts.flatMap((part) => { | |
const { contentLength: length } = part; | |
if (length <= maxPartSizeMiB) { | |
return part; | |
} | |
// Setting max part length to be 5 GiB - 5 MiB so the final part can be at least 5 MiB | |
const maxPartLength = maxPartSizeMiB - minPartSizeMiB; | |
const resizedParts: Part[] = []; | |
let start = 0; | |
let end = maxPartLength - 1; | |
resizedParts.push({ ...part, start, end, contentLength: end - start + 1 }); | |
while (end < length - 1) { | |
start = end + 1; | |
end = Math.min(start + maxPartLength - 1, length - 1); | |
resizedParts.push({ ...part, start, end, contentLength: end - start + 1 }); | |
} | |
return resizedParts; | |
}); | |
const instructions: UploadPartCopyCommandInput[] = minimumSizedParts.map( | |
({ bucket, key, uploadId, partKey, start, end }, index) => { | |
const hasByteRange = start !== undefined && end !== undefined; | |
return { | |
Bucket: bucket, | |
Key: key, | |
UploadId: uploadId, | |
PartNumber: startingPartNumber + index, | |
CopySource: `${bucket}/${partKey}`, | |
CopySourceRange: hasByteRange ? `bytes=${start}-${end}` : undefined, | |
}; | |
} | |
); | |
return instructions; | |
} | |
/** | |
* Uploads parts with content length of at least 5 MiB using `UploadPartCopy`. | |
* This copies data from an existing S3 object as a part of a multipart upload. | |
* | |
* @param requests Array of `UploadPartCopyCommandInput`. | |
* @param startingPartNumber Initial part number. | |
* @returns Promise containing an array of uploaded parts. | |
*/ | |
export async function uploadBigParts( | |
requests: UploadPartCopyCommandInput[], | |
startingPartNumber: PartNumber | |
): Promise<CompletedPart[]> { | |
const responses = await Promise.all(requests.map((request) => uploadPartCopy(request))); | |
const entityTags: ETag[] = responses | |
.map((response) => response.CopyPartResult?.ETag) | |
.filter((tag): tag is string => !!tag); | |
const completedParts: CompletedPart[] = entityTags.map((tag, index) => { | |
return { ETag: tag, PartNumber: startingPartNumber + index }; | |
}); | |
return completedParts; | |
} | |
/** | |
* Upload parts with content lengths less than 5 MiB by retrieving them using `GetObject`, | |
* merging their content into a part, and uploading this part using `UploadPart`. | |
* | |
* @param parts Array of part metadata. | |
* @param startingPartNumber Initial part number. | |
* @returns Promise containing an array of uploaded parts. | |
*/ | |
export async function uploadSmallParts(parts: Part[], startingPartNumber: number): Promise<CompletedPart[]> { | |
// Combine small objects into parts of 5 MiB | |
const objects = await Promise.all( | |
parts.map(({ bucket, partKey, start, end }) => { | |
const hasByteRange = start !== undefined && end !== undefined; | |
return getObject({ Bucket: bucket, Key: partKey, Range: hasByteRange ? `bytes=${start}-${end}` : undefined }); | |
}) | |
); | |
const bodies = await Promise.all(objects.map((object) => object.Body?.transformToString())); | |
const nonEmptyBodies = bodies.filter((body): body is string => !!body); | |
const joinedBodies = nonEmptyBodies.join("\n"); | |
const { bucket, key, uploadId } = parts[0]; // All parts have the same multipart upload metadata | |
const response = await uploadPart({ | |
Bucket: bucket, | |
Key: key, | |
UploadId: uploadId, | |
PartNumber: startingPartNumber, | |
Body: joinedBodies, | |
}); | |
const tag = response.ETag; | |
return [{ ETag: tag, PartNumber: startingPartNumber }]; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment