Skip to content

Instantly share code, notes, and snippets.

@coopernurse
Created September 14, 2017 13:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save coopernurse/ae9373879f2e6c8c343fdd39e1d3ea00 to your computer and use it in GitHub Desktop.
Save coopernurse/ae9373879f2e6c8c343fdd39e1d3ea00 to your computer and use it in GitHub Desktop.
ReproDiskConsistency
package com.imprev.util;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.imprev.soa.util.*;
import org.apache.log4j.Logger;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Author: James Cooper - james@bitmechanic.com
* Date: 9/13/17
*/
public class ReproDiskConsistency {
private static final Logger log = Logger.getLogger(ReproDiskConsistency.class);
public static void main(String argv[]) throws Exception {
Log4JInit.removeAppendersAndInit();
String s3Bucket = "redacted";
String s3Prefix = "redacted";
File baseDir = new File("/tmp/diskrepro");
int runtimeSeconds = 10;
int threads = 10;
if (argv.length > 0) {
baseDir = new File(argv[0]);
}
if (argv.length > 1) {
runtimeSeconds = Integer.parseInt(argv[1]);
}
if (argv.length > 2) {
threads = Integer.parseInt(argv[2]);
}
log.info("baseDir: " + baseDir + " runtimeSeconds: " + runtimeSeconds + " threads: " + threads);
AmazonS3Client s3 = new AmazonS3Client(new ProfileCredentialsProvider());
ReproDiskConsistency repro = new ReproDiskConsistency(s3, s3Bucket, s3Prefix, baseDir,
runtimeSeconds, threads);
repro.go();
}
///////////////////////////////////////////////////////////////////////////
private AmazonS3 s3;
private String s3Bucket;
private String s3Prefix;
private File baseDir;
private int runtimeSeconds;
private int threads;
private Random random = new Random();
private AtomicInteger errorCount;
public ReproDiskConsistency(AmazonS3 s3, String s3Bucket, String s3Prefix, File baseDir, int runtimeSeconds,
int threads) {
this.s3 = s3;
this.s3Bucket = s3Bucket;
this.s3Prefix = s3Prefix;
this.baseDir = baseDir;
this.runtimeSeconds = runtimeSeconds;
this.threads = threads;
}
public void go() throws InterruptedException {
log.info("Starting test");
this.errorCount = new AtomicInteger();
ThreadPoolExecutor pool = new ThreadPoolExecutor(threads, threads, 120, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(threads*5), new ThreadPoolExecutor.CallerRunsPolicy());
int fileCount = 0;
long deadline = System.currentTimeMillis() + (runtimeSeconds * 1000);
Iterator<S3ObjectSummary> keys = s3Keys();
while (keys.hasNext() && System.currentTimeMillis() < deadline) {
S3ObjectSummary summary = keys.next();
fileCount++;
if (fileCount % 50 == 0) {
log.info("fileCount: " + fileCount + " last key: " + summary.getKey());
}
pool.submit(() -> downloadAndVerify(summary));
}
pool.shutdown();
pool.awaitTermination(10, TimeUnit.MINUTES);
log.info("Test Complete");
log.info(" files downloaded: " + fileCount);
log.info(" errors: " + errorCount.get());
}
private Iterator<S3ObjectSummary> s3Keys() {
return new BaseBatchIterator<S3ObjectSummary>() {
@Override
protected List<S3ObjectSummary> loadNextBatch(S3ObjectSummary s3ObjectSummary) {
String randStr = Integer.toHexString(random.nextInt());
String prefix = s3Prefix + randStr.substring(0, 2) + "/" + randStr.substring(2, 4) + "/";
ObjectListing listing = s3.listObjects(s3Bucket, prefix);
log.info("Loaded " + listing.getObjectSummaries().size() + " keys from prefix: " + prefix);
return listing.getObjectSummaries();
}
};
}
private void downloadAndVerify(S3ObjectSummary summary) {
try {
S3Object obj = s3.getObject(summary.getBucketName(), summary.getKey());
File localFile = new File(baseDir, obj.getKey());
com.imprev.soa.util.FileUtil.mkdirs(localFile.getParentFile());
try (FileOutputStream fos = new FileOutputStream(localFile); InputStream is = obj.getObjectContent()) {
Streams.copyStream(is, fos);
fos.flush();
fos.getFD().sync();
}
long localSize = localFile.length();
if (localSize != summary.getSize()) {
sizeError(localSize, summary);
}
}
catch (Throwable t) {
log.error(t.getMessage(), t);
}
}
private void sizeError(long localSize, S3ObjectSummary summary) {
log.error("size mismatch: local=" + localSize + " s3=" + summary.getSize() + " key=" + summary.getKey());
errorCount.incrementAndGet();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment