Skip to content

Instantly share code, notes, and snippets.

@yongjiajie
Last active July 26, 2023 15:02
Show Gist options
  • Save yongjiajie/8278193e7dbf6982453f692eaf67954d to your computer and use it in GitHub Desktop.
Save yongjiajie/8278193e7dbf6982453f692eaf67954d to your computer and use it in GitHub Desktop.
AWS Multipart Upload
import type { CompletedPart } from "@aws-sdk/client-s3";
import type {
BucketName,
ContentEncoding,
ContentLength,
ContentType,
ETag,
MultipartUploadId,
ObjectKey,
} from "@utils/s3";
import { abortMultipartUpload, completeMultipartUpload, createMultipartUpload, getGetObjectSignedUrl } from "@utils/s3";
import { partition } from "lodash";
import {
calculateUploadPartCopyRequests,
getPartMetadata,
lambdaMemorySizeMiB,
minPartSizeMiB,
groupPartsByLength,
uploadBigParts,
uploadSmallParts,
getKeyName,
} from "./utils";
type MergeReportsRequest = {
bucket: BucketName;
keys: ObjectKey[];
};
type MergeReportsResponse = {
signedUrl?: string;
};
export type Part = {
bucket: BucketName;
key: ObjectKey;
uploadId: MultipartUploadId;
partKey: ObjectKey;
contentLength: ContentLength;
contentType?: ContentType;
contentEncoding?: ContentEncoding;
eTag?: ETag;
start?: number;
end?: number;
};
/**
* Merges a list of S3 objects by appending their contents into a single S3 object
* through multipart uploads.
*
* Take note of [multipart upload limits](https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html).
*
* Content length of multipart upload parts should be between 5 MiB and 5 GiB, except the last part.
* There is a maximum of 10000 parts.
*
* @param event Event from Step Function containing a list of S3 object paths.
* @param context AWS Lambda context object.
*/
const mergeReports = async (event: MergeReportsRequest): Promise<MergeReportsResponse> => {
const { bucket, keys } = event;
let signedUrl: string | undefined;
let partNumber = 1;
const completedParts: CompletedPart[] = [];
const targetKey = getKeyName();
const { UploadId: uploadId } = await createMultipartUpload({
Bucket: bucket,
Key: targetKey,
});
if (uploadId) {
const partLengths = await getPartMetadata(bucket, targetKey, uploadId, keys);
// Multipart upload parts have a size limit of [5 MiB, 5 GiB].
// We have to handle small and big parts differently.
const [smallParts, bigParts] = partition(partLengths, (part) => part.contentLength < minPartSizeMiB);
if (bigParts.length > 0) {
// Upload big parts with `UploadPartCopy`
const uploadPartCopyRequests = calculateUploadPartCopyRequests(bigParts, partNumber);
try {
const completedBigParts = await uploadBigParts(uploadPartCopyRequests, partNumber);
completedParts.push(...completedBigParts);
partNumber += completedBigParts.length;
} catch (e) {
throw e;
}
}
if (smallParts.length > 0) {
/**
* Parts have a minimum size limit of 5 MiB.
*
* To fulfil this, we retrieve small parts in partitions below the default lambda
* memory limit since we have to hold their contents in memory.
*
* The contents of these partitions are merged and uploaded as a part using `UploadPart`.
*/
const smallPartGroups = groupPartsByLength(smallParts, lambdaMemorySizeMiB);
try {
/**
* Using for...of instead of Promise.all since we want these
* requests to be fulfiled sequentially since they may each take up
* all the memory in the Lambda.
*/
for (const parts of smallPartGroups) {
const completedSmallParts = await uploadSmallParts(parts, partNumber);
completedParts.push(...completedSmallParts);
partNumber += completedSmallParts.length;
}
} catch (e) {
throw e;
}
}
// Maximum number of parts per upload is 10000
if (completedParts.length > 10000) {
await abortMultipartUpload({ Bucket: bucket, Key: targetKey, UploadId: uploadId });
throw Error("Too many multipart parts");
}
try {
await completeMultipartUpload({
Bucket: bucket,
Key: targetKey,
UploadId: uploadId,
MultipartUpload: { Parts: completedParts },
});
} catch (e) {
throw e;
}
try {
signedUrl = await getGetObjectSignedUrl({ Bucket: bucket, Key: targetKey });
} catch (e) {
throw e;
}
}
return {
signedUrl,
};
};
export const main = mergeReports;
import type { Part } from "./multipartUpload";
import type { CompletedPart, UploadPartCopyCommandInput } from "@aws-sdk/client-s3";
import type { BucketName, ETag, MultipartUploadId, ObjectKey, PartNumber } from "@utils/s3";
import { getObject, headObject, uploadPart, uploadPartCopy } from "@utils/s3";
import KSUID from "ksuid";
import { sumBy } from "lodash";
export const lambdaMemorySizeMiB = 80 * 1024 * 1024; // 80 MiB; conservative memory limit for parts we hold in memory
export const minPartSizeMiB = 5 * 1024 * 1024; // 5 MiB
const maxPartSizeMiB = 5 * 1024 * 1024 * 1024; // 5 GiB
/**
* Generates a formatted key name for an S3 object.
*
* @param prefix Prefix of S3 object.
* @param suffix Suffix of S3 object.
* @returns Formatted key of S3 object.
*/
export function getKeyName(prefix = "merged", suffix = "csv") {
const now = new Date();
const ksuid = KSUID.randomSync().string;
return `${prefix}/${now.toISOString()}-${ksuid}.${suffix}`;
}
/**
* Retrieves metadata of each S3 object using `HeadObject`.
*
* @param bucket S3 bucket name.
* @param key S3 object key to multipart upload to.
* @param uploadId Multipart upload ID.
* @param keys Array of S3 object keys of parts.
* @returns Promise containing an array of object metadata.
*/
export async function getPartMetadata(
bucket: BucketName,
key: ObjectKey,
uploadId: MultipartUploadId,
keys: ObjectKey[]
): Promise<Part[]> {
const partMetadata = await Promise.all(
keys.map(async (partKey) => {
const {
ContentLength: contentLength,
ContentEncoding: contentEncoding,
ContentType: contentType,
ETag: eTag,
} = await headObject({ Bucket: bucket, Key: partKey });
return { bucket, key, uploadId, partKey, contentLength: contentLength ?? 0, contentEncoding, contentType, eTag };
})
);
return partMetadata;
}
/**
* Group parts by creating sub-arrays of part metadata whose content length totals are less than the limit.
*
* @param parts Array of part metadata.
* @param limit Limit of each group.
* @returns Array with groups of parts with lengths totalling less than the lambda's default memory limit.
*/
export function groupPartsByLength(parts: Part[], limit: number): Part[][] {
const groups: Part[][] = [];
let currentGroup = 0;
function joinGroup(part: Part, index: number) {
if (!groups[index]) {
groups[index] = [part];
} else {
groups[index].push(part);
}
}
function getGroupSum(index: number) {
if (!groups[index]) {
groups[index] = [];
}
return sumBy(groups[index], "contentLength");
}
function splitPart(part: Part) {
const subParts: Part[] = [];
const { contentLength } = part;
const count = Math.ceil(contentLength / limit);
for (let i = 0; i < count; i += 1) {
const start = i * limit;
let end = (i + 1) * limit - 1;
end = Math.min(end, contentLength - 1);
const subPartLength = end - start + 1;
subParts.push({ ...part, contentLength: subPartLength, start, end });
}
return subParts;
}
function findAndJoinGroup(part: Part) {
const isPartBig = part.contentLength > limit;
const groupSum = getGroupSum(currentGroup);
const canJoinGroup = part.contentLength + groupSum < limit;
if (isPartBig) {
const subParts = splitPart(part);
subParts.forEach((subPart) => findAndJoinGroup(subPart));
} else if (canJoinGroup) {
joinGroup(part, currentGroup);
} else {
currentGroup += 1;
joinGroup(part, currentGroup);
}
}
parts.forEach((part) => findAndJoinGroup(part));
const nonEmptyGroups = groups.filter((group) => group.length > 0);
return nonEmptyGroups;
}
/**
* Returns an array of `UploadPartCopyCommandInput` to facilitate uploading of parts with
* size of at least 5 MiB using `UploadPartCopy`.
*
* @param parts Array of part metadata.
* @param startingPartNumber Initial part number.
* @returns Array of `UploadPartCopyCommandInput`.
*/
export function calculateUploadPartCopyRequests(
parts: Part[],
startingPartNumber: number
): UploadPartCopyCommandInput[] {
// Parts must have size of [5 MiB, 5 GiB]
const minimumSizedParts = parts.flatMap((part) => {
const { contentLength: length } = part;
if (length <= maxPartSizeMiB) {
return part;
}
// Setting max part length to be 5 GiB - 5 MiB so the final part can be at least 5 MiB
const maxPartLength = maxPartSizeMiB - minPartSizeMiB;
const resizedParts: Part[] = [];
let start = 0;
let end = maxPartLength - 1;
resizedParts.push({ ...part, start, end, contentLength: end - start + 1 });
while (end < length - 1) {
start = end + 1;
end = Math.min(start + maxPartLength - 1, length - 1);
resizedParts.push({ ...part, start, end, contentLength: end - start + 1 });
}
return resizedParts;
});
const instructions: UploadPartCopyCommandInput[] = minimumSizedParts.map(
({ bucket, key, uploadId, partKey, start, end }, index) => {
const hasByteRange = start !== undefined && end !== undefined;
return {
Bucket: bucket,
Key: key,
UploadId: uploadId,
PartNumber: startingPartNumber + index,
CopySource: `${bucket}/${partKey}`,
CopySourceRange: hasByteRange ? `bytes=${start}-${end}` : undefined,
};
}
);
return instructions;
}
/**
* Uploads parts with content length of at least 5 MiB using `UploadPartCopy`.
* This copies data from an existing S3 object as a part of a multipart upload.
*
* @param requests Array of `UploadPartCopyCommandInput`.
* @param startingPartNumber Initial part number.
* @returns Promise containing an array of uploaded parts.
*/
export async function uploadBigParts(
requests: UploadPartCopyCommandInput[],
startingPartNumber: PartNumber
): Promise<CompletedPart[]> {
const responses = await Promise.all(requests.map((request) => uploadPartCopy(request)));
const entityTags: ETag[] = responses
.map((response) => response.CopyPartResult?.ETag)
.filter((tag): tag is string => !!tag);
const completedParts: CompletedPart[] = entityTags.map((tag, index) => {
return { ETag: tag, PartNumber: startingPartNumber + index };
});
return completedParts;
}
/**
* Upload parts with content lengths less than 5 MiB by retrieving them using `GetObject`,
* merging their content into a part, and uploading this part using `UploadPart`.
*
* @param parts Array of part metadata.
* @param startingPartNumber Initial part number.
* @returns Promise containing an array of uploaded parts.
*/
export async function uploadSmallParts(parts: Part[], startingPartNumber: number): Promise<CompletedPart[]> {
// Combine small objects into parts of 5 MiB
const objects = await Promise.all(
parts.map(({ bucket, partKey, start, end }) => {
const hasByteRange = start !== undefined && end !== undefined;
return getObject({ Bucket: bucket, Key: partKey, Range: hasByteRange ? `bytes=${start}-${end}` : undefined });
})
);
const bodies = await Promise.all(objects.map((object) => object.Body?.transformToString()));
const nonEmptyBodies = bodies.filter((body): body is string => !!body);
const joinedBodies = nonEmptyBodies.join("\n");
const { bucket, key, uploadId } = parts[0]; // All parts have the same multipart upload metadata
const response = await uploadPart({
Bucket: bucket,
Key: key,
UploadId: uploadId,
PartNumber: startingPartNumber,
Body: joinedBodies,
});
const tag = response.ETag;
return [{ ETag: tag, PartNumber: startingPartNumber }];
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment