Skip to content

Instantly share code, notes, and snippets.

@azimbabu
Created December 22, 2018 00:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save azimbabu/d9dc9e05ee008875325472d598924df8 to your computer and use it in GitHub Desktop.
Save azimbabu/d9dc9e05ee008875325472d598924df8 to your computer and use it in GitHub Desktop.
package s3;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkClientException;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.*;
import com.google.common.base.Stopwatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class S3ObjectMerger {
private static Logger LOGGER = LoggerFactory.getLogger(S3ObjectMerger.class);
private static final long MAX_BYTES = 100 * 1024 * 1024;
public static void main(String[] args) {
String sourceBucketName = "test-source-bucket";
String sourceKeyPrefix = "test-source-key-prefix-";
String destinationBucketName = "test-destination-bucket;
String destinationKeyPrefix = sourceKeyPrefix + "objects-combined";
Stopwatch stopwatch = Stopwatch.createStarted();
try {
AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
.withCredentials(new ProfileCredentialsProvider("profile-1"))
.withRegion("us-west-2")
.build();
List<S3ObjectSummary> sourceObjectSummaries = fetchSourceObjects(sourceBucketName, sourceKeyPrefix, s3Client);
int sourceKeyIndex = 0;
int destinationFileNumber = 0;
while (sourceKeyIndex < sourceObjectSummaries.size()) {
sourceKeyIndex = copyCurrentBatch(sourceBucketName,
destinationBucketName,
destinationKeyPrefix,
s3Client,
sourceObjectSummaries,
sourceKeyIndex,
destinationFileNumber);
destinationFileNumber++;
}
deleteSourceObjects(sourceBucketName, s3Client, sourceObjectSummaries);
} catch (AmazonServiceException ex) {
LOGGER.error("Amazon service exception={}", ex);
} catch (SdkClientException ex) {
LOGGER.error("Amazon sdk client exception={}", ex);
}
stopwatch.stop();
LOGGER.info("Time taken={} seconds", stopwatch.elapsed(TimeUnit.SECONDS));
}
private static int copyCurrentBatch(String sourceBucketName, String destinationBucketName,
String destinationKeyPrefix, AmazonS3 s3Client,
List<S3ObjectSummary> sourceObjectSummaries, int sourceKeyIndex,
int destinationFileNumber) {
// Initiate the multipart upload.
String destinationObjectKey = destinationKeyPrefix + destinationFileNumber;
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(destinationBucketName,
destinationObjectKey);
InitiateMultipartUploadResult initResult = s3Client.initiateMultipartUpload(initRequest);
long bytePosition = 0;
int partNum = 1;
List<CopyPartResult> copyResponses = new ArrayList<>();
while (bytePosition < MAX_BYTES && sourceKeyIndex < sourceObjectSummaries.size()) {
S3ObjectSummary sourceObject = sourceObjectSummaries.get(sourceKeyIndex);
CopyPartRequest copyRequest = new CopyPartRequest()
.withSourceBucketName(sourceBucketName)
.withSourceKey(sourceObject.getKey())
.withDestinationBucketName(destinationBucketName)
.withDestinationKey(destinationObjectKey)
.withUploadId(initResult.getUploadId())
.withFirstByte(bytePosition)
.withPartNumber(partNum++);
copyResponses.add(s3Client.copyPart(copyRequest));
bytePosition += sourceObject.getSize();
sourceKeyIndex++;
}
// Complete the upload request to concatenate all uploaded parts and make the copied object available.
CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(
destinationBucketName,
destinationObjectKey,
initResult.getUploadId(),
getETags(copyResponses));
s3Client.completeMultipartUpload(completeRequest);
return sourceKeyIndex;
}
private static List<S3ObjectSummary> fetchSourceObjects(String sourceBucketName,
String sourceKeyPrefix,
AmazonS3 s3Client) {
ListObjectsV2Request listRequest = new ListObjectsV2Request()
.withBucketName(sourceBucketName).
withPrefix(sourceKeyPrefix);
ListObjectsV2Result listResult = s3Client.listObjectsV2(listRequest);
List<S3ObjectSummary> sourceObjectSummaries = listResult.getObjectSummaries();
for (S3ObjectSummary objectSummary : sourceObjectSummaries) {
// size in bytes
LOGGER.info("Source object key={}, size={} bytes", objectSummary.getKey(), objectSummary.getSize());
}
return sourceObjectSummaries;
}
private static void deleteSourceObjects(String sourceBucketName, AmazonS3 s3Client,
List<S3ObjectSummary> sourceObjectSummaries) {
String[] sourceKeys = sourceObjectSummaries.stream().map(S3ObjectSummary::getKey).toArray(String[]::new);
DeleteObjectsRequest multiObjectDeleteRequest = new DeleteObjectsRequest(sourceBucketName)
.withKeys(sourceKeys)
.withQuiet(false);
// Verify that the objects were deleted successfully.
DeleteObjectsResult delObjRes = s3Client.deleteObjects(multiObjectDeleteRequest);
int successfulDeletes = delObjRes.getDeletedObjects().size();
LOGGER.info("{} objects successfully deleted.", successfulDeletes);
}
// This is a helper function to construct a list of ETags.
private static List<PartETag> getETags(List<CopyPartResult> responses) {
List<PartETag> etags = new ArrayList<>();
for (CopyPartResult response : responses) {
etags.add(new PartETag(response.getPartNumber(), response.getETag()));
}
return etags;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment