Created
December 22, 2018 00:33
-
-
Save azimbabu/d9dc9e05ee008875325472d598924df8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package s3; | |
import com.amazonaws.AmazonServiceException; | |
import com.amazonaws.SdkClientException; | |
import com.amazonaws.auth.profile.ProfileCredentialsProvider; | |
import com.amazonaws.services.s3.AmazonS3; | |
import com.amazonaws.services.s3.AmazonS3ClientBuilder; | |
import com.amazonaws.services.s3.model.*; | |
import com.google.common.base.Stopwatch; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
public class S3ObjectMerger { | |
private static Logger LOGGER = LoggerFactory.getLogger(S3ObjectMerger.class); | |
private static final long MAX_BYTES = 100 * 1024 * 1024; | |
public static void main(String[] args) { | |
String sourceBucketName = "test-source-bucket"; | |
String sourceKeyPrefix = "test-source-key-prefix-"; | |
String destinationBucketName = "test-destination-bucket; | |
String destinationKeyPrefix = sourceKeyPrefix + "objects-combined"; | |
Stopwatch stopwatch = Stopwatch.createStarted(); | |
try { | |
AmazonS3 s3Client = AmazonS3ClientBuilder.standard() | |
.withCredentials(new ProfileCredentialsProvider("profile-1")) | |
.withRegion("us-west-2") | |
.build(); | |
List<S3ObjectSummary> sourceObjectSummaries = fetchSourceObjects(sourceBucketName, sourceKeyPrefix, s3Client); | |
int sourceKeyIndex = 0; | |
int destinationFileNumber = 0; | |
while (sourceKeyIndex < sourceObjectSummaries.size()) { | |
sourceKeyIndex = copyCurrentBatch(sourceBucketName, | |
destinationBucketName, | |
destinationKeyPrefix, | |
s3Client, | |
sourceObjectSummaries, | |
sourceKeyIndex, | |
destinationFileNumber); | |
destinationFileNumber++; | |
} | |
deleteSourceObjects(sourceBucketName, s3Client, sourceObjectSummaries); | |
} catch (AmazonServiceException ex) { | |
LOGGER.error("Amazon service exception={}", ex); | |
} catch (SdkClientException ex) { | |
LOGGER.error("Amazon sdk client exception={}", ex); | |
} | |
stopwatch.stop(); | |
LOGGER.info("Time taken={} seconds", stopwatch.elapsed(TimeUnit.SECONDS)); | |
} | |
private static int copyCurrentBatch(String sourceBucketName, String destinationBucketName, | |
String destinationKeyPrefix, AmazonS3 s3Client, | |
List<S3ObjectSummary> sourceObjectSummaries, int sourceKeyIndex, | |
int destinationFileNumber) { | |
// Initiate the multipart upload. | |
String destinationObjectKey = destinationKeyPrefix + destinationFileNumber; | |
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(destinationBucketName, | |
destinationObjectKey); | |
InitiateMultipartUploadResult initResult = s3Client.initiateMultipartUpload(initRequest); | |
long bytePosition = 0; | |
int partNum = 1; | |
List<CopyPartResult> copyResponses = new ArrayList<>(); | |
while (bytePosition < MAX_BYTES && sourceKeyIndex < sourceObjectSummaries.size()) { | |
S3ObjectSummary sourceObject = sourceObjectSummaries.get(sourceKeyIndex); | |
CopyPartRequest copyRequest = new CopyPartRequest() | |
.withSourceBucketName(sourceBucketName) | |
.withSourceKey(sourceObject.getKey()) | |
.withDestinationBucketName(destinationBucketName) | |
.withDestinationKey(destinationObjectKey) | |
.withUploadId(initResult.getUploadId()) | |
.withFirstByte(bytePosition) | |
.withPartNumber(partNum++); | |
copyResponses.add(s3Client.copyPart(copyRequest)); | |
bytePosition += sourceObject.getSize(); | |
sourceKeyIndex++; | |
} | |
// Complete the upload request to concatenate all uploaded parts and make the copied object available. | |
CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest( | |
destinationBucketName, | |
destinationObjectKey, | |
initResult.getUploadId(), | |
getETags(copyResponses)); | |
s3Client.completeMultipartUpload(completeRequest); | |
return sourceKeyIndex; | |
} | |
private static List<S3ObjectSummary> fetchSourceObjects(String sourceBucketName, | |
String sourceKeyPrefix, | |
AmazonS3 s3Client) { | |
ListObjectsV2Request listRequest = new ListObjectsV2Request() | |
.withBucketName(sourceBucketName). | |
withPrefix(sourceKeyPrefix); | |
ListObjectsV2Result listResult = s3Client.listObjectsV2(listRequest); | |
List<S3ObjectSummary> sourceObjectSummaries = listResult.getObjectSummaries(); | |
for (S3ObjectSummary objectSummary : sourceObjectSummaries) { | |
// size in bytes | |
LOGGER.info("Source object key={}, size={} bytes", objectSummary.getKey(), objectSummary.getSize()); | |
} | |
return sourceObjectSummaries; | |
} | |
private static void deleteSourceObjects(String sourceBucketName, AmazonS3 s3Client, | |
List<S3ObjectSummary> sourceObjectSummaries) { | |
String[] sourceKeys = sourceObjectSummaries.stream().map(S3ObjectSummary::getKey).toArray(String[]::new); | |
DeleteObjectsRequest multiObjectDeleteRequest = new DeleteObjectsRequest(sourceBucketName) | |
.withKeys(sourceKeys) | |
.withQuiet(false); | |
// Verify that the objects were deleted successfully. | |
DeleteObjectsResult delObjRes = s3Client.deleteObjects(multiObjectDeleteRequest); | |
int successfulDeletes = delObjRes.getDeletedObjects().size(); | |
LOGGER.info("{} objects successfully deleted.", successfulDeletes); | |
} | |
// This is a helper function to construct a list of ETags. | |
private static List<PartETag> getETags(List<CopyPartResult> responses) { | |
List<PartETag> etags = new ArrayList<>(); | |
for (CopyPartResult response : responses) { | |
etags.add(new PartETag(response.getPartNumber(), response.getETag())); | |
} | |
return etags; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment