Created
July 3, 2013 21:41
-
-
Save vinothchandar/5923088 to your computer and use it in GitHub Desktop.
GCFriendlyCleaner implementation using NIO Zero Copy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File; | |
import java.io.FileInputStream; | |
import java.io.FileOutputStream; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.FileChannel; | |
import java.util.Random; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
/** | |
* | |
* java -Xms4g -Xmx4g -XX:NewSize=1024m -XX:MaxNewSize=1024m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70 -XX:SurvivorRatio=2 -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -Xloggc:cleanergc.log -XX:+PrintGCTimeStamps GCFriendlyCleaner | |
* | |
*/ | |
public class GCFriendlyCleaner implements Runnable { | |
public static int readUnsignedShort(ByteBuffer bytes, int offset) { | |
return (((bytes.get(offset) & 0xff) << 8) | (bytes.get(offset + 1) & 0xff)); | |
} | |
public static void writeUnsignedShort(byte[] bytes, int value, int offset) { | |
bytes[offset] = (byte) (0xFF & (value >> 8)); | |
bytes[offset + 1] = (byte) (0xFF & value); | |
} | |
public static File deleteAndCreateFile(String filePath) throws Exception { | |
File file = new File(filePath); | |
if (file.exists()) | |
file.delete(); | |
file.createNewFile(); | |
return file; | |
} | |
static final int POINTER_SIZE = 8; | |
static final int MAX_KEY_SIZE = 2 * 1024 * 1024; | |
Random random; | |
ByteBuffer addressBuffer; | |
ByteBuffer keyBuffer; | |
ByteBuffer keySizeBuffer; | |
ByteBuffer valueSizeBuffer; | |
String filePrefix; | |
int fileSizeInBytes; | |
int keySize; | |
int minValueSize; | |
int maxValueSize; | |
int totalCleanerIterations; | |
CountDownLatch latch; | |
public GCFriendlyCleaner(String filePrefix, int fileSizeInBytes, int keySize, int minValueSize, int maxValueSize, int totalCleanerIterations, CountDownLatch latch) { | |
random = new Random(); | |
addressBuffer = ByteBuffer.allocate(POINTER_SIZE); | |
keyBuffer = ByteBuffer.allocate(MAX_KEY_SIZE); | |
keySizeBuffer = ByteBuffer.allocate(2); | |
valueSizeBuffer = ByteBuffer.allocate(2); | |
this.fileSizeInBytes = fileSizeInBytes; | |
this.keySize = keySize; | |
this.minValueSize = minValueSize; | |
this.maxValueSize = maxValueSize; | |
this.totalCleanerIterations = totalCleanerIterations; | |
this.latch = latch; | |
this.filePrefix = filePrefix; | |
} | |
public int generateFile(File dataFile) throws Exception { | |
int writtenBytes = 0; | |
int recordsCount = 0; | |
FileOutputStream fos = new FileOutputStream(dataFile); | |
while (writtenBytes < fileSizeInBytes){ | |
byte[] addressPtr = new byte[POINTER_SIZE]; | |
byte[] keyBlockSize = new byte[2]; | |
byte[] keyBlock = new byte[keySize]; | |
byte[] valueBlockSize = new byte[2]; | |
byte[] valueBlock = new byte[minValueSize + random.nextInt(maxValueSize-minValueSize)]; | |
random.nextBytes(addressPtr); | |
random.nextBytes(keyBlock); | |
random.nextBytes(valueBlock); | |
fos.write(addressPtr); | |
writtenBytes += POINTER_SIZE; | |
writeUnsignedShort(keyBlockSize, keyBlock.length, 0); | |
fos.write(keyBlockSize); | |
writtenBytes += 2; | |
fos.write(keyBlock); | |
writtenBytes += keyBlock.length; | |
writeUnsignedShort(valueBlockSize, valueBlock.length, 0); | |
fos.write(valueBlockSize); | |
writtenBytes += 2; | |
fos.write(valueBlock); | |
writtenBytes += valueBlock.length; | |
//System.out.println("WROTE record "+ keyBlock.length + ","+ valueBlock.length); | |
recordsCount++; | |
} | |
fos.flush(); | |
fos.close(); | |
return recordsCount; | |
} | |
/** | |
* 1. Lower memory requirements. | |
* 2. Value migration can be zero copied.. | |
* 3. No perf degradation since random reads are almost as fast as seq ones on ssds | |
* | |
* @param dataFile | |
* @param migrateFile | |
* @throws Exception | |
*/ | |
public void cleanFile(File dataFile, File migrateFile) throws Exception { | |
FileInputStream fis = new FileInputStream(dataFile); | |
FileChannel dataFileChannel = fis.getChannel(); | |
FileOutputStream fos = new FileOutputStream(migrateFile); | |
FileChannel migrateFileChannel = fos.getChannel(); | |
while(dataFileChannel.position() < dataFileChannel.size()){ | |
keyBuffer.clear(); | |
addressBuffer.clear(); | |
keySizeBuffer.clear(); | |
valueSizeBuffer.clear(); | |
// read a record.. | |
int bytesReadAddr = dataFileChannel.read(addressBuffer); | |
int bytesReadKeySize = dataFileChannel.read(keySizeBuffer); | |
int keySize = readUnsignedShort(keySizeBuffer, 0); | |
keyBuffer.limit(keySize); | |
int bytesReadKey = dataFileChannel.read(keyBuffer); | |
int bytesReadValueSize = dataFileChannel.read(valueSizeBuffer); | |
int valueSize = readUnsignedShort(valueSizeBuffer, 0); | |
/*System.out.printf("File Pos %d , addr %d, keysz %d, key %d, valuesz %d value %d\n", | |
dataFileChannel.position(), | |
bytesReadAddr, | |
bytesReadKeySize, | |
bytesReadKey, | |
bytesReadValueSize, | |
bytesReadValue);*/ | |
// migrate the record.. | |
addressBuffer.flip(); | |
keySizeBuffer.flip(); | |
keyBuffer.flip(); | |
valueSizeBuffer.flip(); | |
migrateFileChannel.write(addressBuffer); | |
migrateFileChannel.write(keySizeBuffer); | |
migrateFileChannel.write(keyBuffer); | |
migrateFileChannel.write(valueSizeBuffer); | |
dataFileChannel.transferTo(dataFileChannel.position(), valueSize, migrateFileChannel); | |
dataFileChannel.position(dataFileChannel.position() + valueSize); | |
//System.out.println("READ record "+ keySize + ","+ valueSize); | |
} | |
fis.close(); | |
fos.close(); | |
} | |
public void run() { | |
try { | |
File dataFile = deleteAndCreateFile(filePrefix+ "-data-java.mmi"); | |
int numRecords = generateFile(dataFile); | |
int dataFileSize = (int) dataFile.length(); | |
System.out.println("Records written "+ numRecords+" Size :"+ dataFileSize); | |
for (int cleanerIteration=0; cleanerIteration < totalCleanerIterations; cleanerIteration++){ | |
File migrateFile = deleteAndCreateFile(filePrefix+ "-migrated-java.mmi"); | |
long startMs = System.currentTimeMillis(); | |
cleanFile(dataFile,migrateFile); | |
long stopMs = System.currentTimeMillis(); | |
System.out.println(this.filePrefix + " Iteration : "+cleanerIteration+", Cleaning Rate :"+ (numRecords/((stopMs-startMs)/1000.0))); | |
System.out.println(this.filePrefix + " Iteration : "+cleanerIteration+", Cleaning bytes/sec :"+ (dataFileSize/((stopMs-startMs)/1000.0))); | |
} | |
} catch (Exception e){ | |
System.err.println("Error in cleaner "+ filePrefix); | |
e.printStackTrace(); | |
} finally { | |
latch.countDown(); | |
} | |
} | |
/** | |
* @param args | |
*/ | |
public static void main(String[] args) throws Exception { | |
if (args.length != 6){ | |
System.out.println("Usage: java GCFriendlyCleaner <totalFileSize> <keysize> <minvaluesize> <maxvaluesize> <iterations> <numthreads>"); | |
System.exit(1); | |
} | |
int numThreads = Integer.parseInt(args[5]); | |
ExecutorService executor = Executors.newFixedThreadPool(numThreads); | |
CountDownLatch latch = new CountDownLatch(numThreads); | |
for (int i=0; i< numThreads; i++){ | |
GCFriendlyCleaner cleaner = new GCFriendlyCleaner("cleaner-"+i, Integer.parseInt(args[0]), | |
Integer.parseInt(args[1]), Integer.parseInt(args[2]), Integer.parseInt(args[3]), Integer.parseInt(args[4]), latch); | |
executor.submit(cleaner); | |
//cleaner.run(); | |
} | |
latch.await(); | |
executor.shutdown(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment