Skip to content

Instantly share code, notes, and snippets.

@robshep
Last active December 11, 2015 22:19
Show Gist options
  • Save robshep/4669110 to your computer and use it in GitHub Desktop.
Save robshep/4669110 to your computer and use it in GitHub Desktop.
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import journal.io.api.Journal;
import journal.io.api.Journal.ReadType;
import journal.io.api.Journal.WriteType;
import journal.io.api.Location;
public class JIOTest
{
final Journal journal;
final int numProducers;
final ExecutorService executer;
final AtomicInteger countdown;
final CountDownLatch pFinish = new CountDownLatch(1);
final CountDownLatch cFinish = new CountDownLatch(1);
private Long producerFinishTime;
private Long consumerFinishTime;
private final byte[] payload;
public JIOTest(final int numRecords,
final int numProducers,
final int numConsumers,
final int payloadSize,
final boolean alwaysPhysicalSync ) throws IOException
{
File journalDir = new File("/tmp/journal-io-delete-test");
journalDir.mkdirs();
journal = new Journal();
journal.setDirectory(journalDir);
journal.setPhysicalSync(alwaysPhysicalSync);
journal.open();
executer = Executors.newFixedThreadPool(numConsumers);
countdown = new AtomicInteger(numRecords);
this.numProducers = numProducers;
this.payload = new byte[payloadSize > 0 ? payloadSize : 0];
}
public void run() throws InterruptedException, IOException
{
final long startTime = System.currentTimeMillis();
for(int i=0;i<numProducers;i++)
{
new QProducer().start();
}
System.out.print("Running...");
pFinish.await();
System.out.println(" ... ");
cFinish.await();
System.out.println("Time to complete production: " + (producerFinishTime - startTime)/1000 );
System.out.println("Time to complete consumption: " + (consumerFinishTime - startTime)/1000 );
System.out.println("Consumption/Production Lag: " + (consumerFinishTime - producerFinishTime)/1000 );
journal.compact();
journal.close();
executer.shutdownNow();
System.exit(0);
}
public static void main(String[] args) throws IOException, InterruptedException
{
new JIOTest(1000, 1, 1, 50, false).run();
}
private ByteBuffer getBuffer(){
return ByteBuffer.allocate(4 + payload.length );
}
public class QProducer extends Thread
{
@Override
public void run() {
int n = -1;
while( (n = countdown.decrementAndGet() ) >= 0 )
{
//System.out.println(" in "+n);
try {
Location loc = journal.write( getBuffer().putInt(n).put(payload).array(), WriteType.SYNC);
executer.submit(new QConsumer(loc));
}
catch (Exception e) {
System.out.println("Exception writing: " + n + " - " + e.getMessage() );
}
}
setProducersFinish();
}
}
public class QConsumer implements Callable<Void>
{
final Location location;
public QConsumer(Location location){
this.location = new Location(location);
}
@Override
public Void call() throws Exception {
int entry = ByteBuffer.wrap( journal.read(location, ReadType.SYNC) ).getInt();
journal.delete(location);
if(entry == 0){
setConsumerFinishTime();
}
return (Void)null;
}
}
public synchronized void setProducersFinish() {
if(producerFinishTime == null){
producerFinishTime = System.currentTimeMillis();
}
pFinish.countDown();
}
public synchronized void setConsumerFinishTime() {
if(this.consumerFinishTime == null){
this.consumerFinishTime = System.currentTimeMillis();
}
cFinish.countDown();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment