Skip to content

Instantly share code, notes, and snippets.

@juiceblender
Created February 27, 2016 06:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save juiceblender/49a75481d38f45ef852e to your computer and use it in GitHub Desktop.
Save juiceblender/49a75481d38f45ef852e to your computer and use it in GitHub Desktop.
BackupTest
public class BackupTest {
private static final BasicAWSCredentials awsCred = new BasicAWSCredentials("Woot", "Wow");
private static final AmazonS3Client s3Client = new AmazonS3Client(awsCred);
private static final TransferManager transferManager = new TransferManager(s3Client);
private static final String backupBucket = "test-instaclustr-backup-blues";
private static final Path largeFile = Paths.get("/data/MegaTestFile");
public static class RateLimitedInputStream extends InputStream {
private final InputStream delegate;
final RateLimiter limiter;
public RateLimitedInputStream(final InputStream delegate, final RateLimiter limiter) {
this.delegate = delegate;
this.limiter = limiter;
}
@Override
public int read() throws IOException {
limiter.acquire();
return delegate.read();
}
@Override
public int read(final byte[] b) throws IOException {
limiter.acquire(b.length);
return delegate.read(b);
}
@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
limiter.acquire(len);
return delegate.read(b, off, len);
}
@Override
public boolean markSupported() {
return delegate.markSupported();
}
@Override
public int available() throws IOException {
return delegate.available();
}
@Override
public void close() throws IOException {
delegate.close();
}
@Override
public synchronized void mark(final int readlimit) {
delegate.mark(readlimit);
}
@Override
public synchronized void reset() throws IOException {
delegate.reset();
}
}
final static Function<InputStream, InputStream> wrapWithRateLimitedStream = ((Supplier<Function<InputStream, InputStream>>) () -> {
final RateLimiter rateLimiter = RateLimiter.create(10*1024*1024);
return (s) -> new RateLimitedInputStream(s, rateLimiter);
}).get();
public static void main(String[] args) throws Exception {
try (final InputStream s = Files.newInputStream(largeFile)) {
final InputStream rateLimitedStream = wrapWithRateLimitedStream.apply(s);
final Upload upload = transferManager.upload(backupBucket, "Sadness", rateLimitedStream, new ObjectMetadata() {{
setContentLength(Files.size(largeFile));
}});
upload.addProgressListener((ProgressEvent progressEvent) -> {
int i = 1;
if (progressEvent.getEventType() == ProgressEventType.TRANSFER_PART_COMPLETED_EVENT) {
System.out.println("Successfully uploaded part for Sadness." + i);
i++;
}
});
upload.waitForCompletion();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment