Skip to content

Instantly share code, notes, and snippets.

@vinothchandar
Created July 3, 2013 21:41
Show Gist options
  • Save vinothchandar/5923088 to your computer and use it in GitHub Desktop.
Save vinothchandar/5923088 to your computer and use it in GitHub Desktop.
GCFriendlyCleaner implementation using NIO Zero Copy
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*
* java -Xms4g -Xmx4g -XX:NewSize=1024m -XX:MaxNewSize=1024m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70 -XX:SurvivorRatio=2 -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -Xloggc:cleanergc.log -XX:+PrintGCTimeStamps GCFriendlyCleaner
*
*/
public class GCFriendlyCleaner implements Runnable {
public static int readUnsignedShort(ByteBuffer bytes, int offset) {
return (((bytes.get(offset) & 0xff) << 8) | (bytes.get(offset + 1) & 0xff));
}
public static void writeUnsignedShort(byte[] bytes, int value, int offset) {
bytes[offset] = (byte) (0xFF & (value >> 8));
bytes[offset + 1] = (byte) (0xFF & value);
}
public static File deleteAndCreateFile(String filePath) throws Exception {
File file = new File(filePath);
if (file.exists())
file.delete();
file.createNewFile();
return file;
}
static final int POINTER_SIZE = 8;
static final int MAX_KEY_SIZE = 2 * 1024 * 1024;
Random random;
ByteBuffer addressBuffer;
ByteBuffer keyBuffer;
ByteBuffer keySizeBuffer;
ByteBuffer valueSizeBuffer;
String filePrefix;
int fileSizeInBytes;
int keySize;
int minValueSize;
int maxValueSize;
int totalCleanerIterations;
CountDownLatch latch;
public GCFriendlyCleaner(String filePrefix, int fileSizeInBytes, int keySize, int minValueSize, int maxValueSize, int totalCleanerIterations, CountDownLatch latch) {
random = new Random();
addressBuffer = ByteBuffer.allocate(POINTER_SIZE);
keyBuffer = ByteBuffer.allocate(MAX_KEY_SIZE);
keySizeBuffer = ByteBuffer.allocate(2);
valueSizeBuffer = ByteBuffer.allocate(2);
this.fileSizeInBytes = fileSizeInBytes;
this.keySize = keySize;
this.minValueSize = minValueSize;
this.maxValueSize = maxValueSize;
this.totalCleanerIterations = totalCleanerIterations;
this.latch = latch;
this.filePrefix = filePrefix;
}
public int generateFile(File dataFile) throws Exception {
int writtenBytes = 0;
int recordsCount = 0;
FileOutputStream fos = new FileOutputStream(dataFile);
while (writtenBytes < fileSizeInBytes){
byte[] addressPtr = new byte[POINTER_SIZE];
byte[] keyBlockSize = new byte[2];
byte[] keyBlock = new byte[keySize];
byte[] valueBlockSize = new byte[2];
byte[] valueBlock = new byte[minValueSize + random.nextInt(maxValueSize-minValueSize)];
random.nextBytes(addressPtr);
random.nextBytes(keyBlock);
random.nextBytes(valueBlock);
fos.write(addressPtr);
writtenBytes += POINTER_SIZE;
writeUnsignedShort(keyBlockSize, keyBlock.length, 0);
fos.write(keyBlockSize);
writtenBytes += 2;
fos.write(keyBlock);
writtenBytes += keyBlock.length;
writeUnsignedShort(valueBlockSize, valueBlock.length, 0);
fos.write(valueBlockSize);
writtenBytes += 2;
fos.write(valueBlock);
writtenBytes += valueBlock.length;
//System.out.println("WROTE record "+ keyBlock.length + ","+ valueBlock.length);
recordsCount++;
}
fos.flush();
fos.close();
return recordsCount;
}
/**
* 1. Lower memory requirements.
* 2. Value migration can be zero copied..
* 3. No perf degradation since random reads are almost as fast as seq ones on ssds
*
* @param dataFile
* @param migrateFile
* @throws Exception
*/
public void cleanFile(File dataFile, File migrateFile) throws Exception {
FileInputStream fis = new FileInputStream(dataFile);
FileChannel dataFileChannel = fis.getChannel();
FileOutputStream fos = new FileOutputStream(migrateFile);
FileChannel migrateFileChannel = fos.getChannel();
while(dataFileChannel.position() < dataFileChannel.size()){
keyBuffer.clear();
addressBuffer.clear();
keySizeBuffer.clear();
valueSizeBuffer.clear();
// read a record..
int bytesReadAddr = dataFileChannel.read(addressBuffer);
int bytesReadKeySize = dataFileChannel.read(keySizeBuffer);
int keySize = readUnsignedShort(keySizeBuffer, 0);
keyBuffer.limit(keySize);
int bytesReadKey = dataFileChannel.read(keyBuffer);
int bytesReadValueSize = dataFileChannel.read(valueSizeBuffer);
int valueSize = readUnsignedShort(valueSizeBuffer, 0);
/*System.out.printf("File Pos %d , addr %d, keysz %d, key %d, valuesz %d value %d\n",
dataFileChannel.position(),
bytesReadAddr,
bytesReadKeySize,
bytesReadKey,
bytesReadValueSize,
bytesReadValue);*/
// migrate the record..
addressBuffer.flip();
keySizeBuffer.flip();
keyBuffer.flip();
valueSizeBuffer.flip();
migrateFileChannel.write(addressBuffer);
migrateFileChannel.write(keySizeBuffer);
migrateFileChannel.write(keyBuffer);
migrateFileChannel.write(valueSizeBuffer);
dataFileChannel.transferTo(dataFileChannel.position(), valueSize, migrateFileChannel);
dataFileChannel.position(dataFileChannel.position() + valueSize);
//System.out.println("READ record "+ keySize + ","+ valueSize);
}
fis.close();
fos.close();
}
public void run() {
try {
File dataFile = deleteAndCreateFile(filePrefix+ "-data-java.mmi");
int numRecords = generateFile(dataFile);
int dataFileSize = (int) dataFile.length();
System.out.println("Records written "+ numRecords+" Size :"+ dataFileSize);
for (int cleanerIteration=0; cleanerIteration < totalCleanerIterations; cleanerIteration++){
File migrateFile = deleteAndCreateFile(filePrefix+ "-migrated-java.mmi");
long startMs = System.currentTimeMillis();
cleanFile(dataFile,migrateFile);
long stopMs = System.currentTimeMillis();
System.out.println(this.filePrefix + " Iteration : "+cleanerIteration+", Cleaning Rate :"+ (numRecords/((stopMs-startMs)/1000.0)));
System.out.println(this.filePrefix + " Iteration : "+cleanerIteration+", Cleaning bytes/sec :"+ (dataFileSize/((stopMs-startMs)/1000.0)));
}
} catch (Exception e){
System.err.println("Error in cleaner "+ filePrefix);
e.printStackTrace();
} finally {
latch.countDown();
}
}
/**
* @param args
*/
public static void main(String[] args) throws Exception {
if (args.length != 6){
System.out.println("Usage: java GCFriendlyCleaner <totalFileSize> <keysize> <minvaluesize> <maxvaluesize> <iterations> <numthreads>");
System.exit(1);
}
int numThreads = Integer.parseInt(args[5]);
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
CountDownLatch latch = new CountDownLatch(numThreads);
for (int i=0; i< numThreads; i++){
GCFriendlyCleaner cleaner = new GCFriendlyCleaner("cleaner-"+i, Integer.parseInt(args[0]),
Integer.parseInt(args[1]), Integer.parseInt(args[2]), Integer.parseInt(args[3]), Integer.parseInt(args[4]), latch);
executor.submit(cleaner);
//cleaner.run();
}
latch.await();
executor.shutdown();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment