Skip to content

Instantly share code, notes, and snippets.

@shilad
Last active December 9, 2015 18:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shilad/cd7b8a51a137417705e5 to your computer and use it in GitHub Desktop.
Save shilad/cd7b8a51a137417705e5 to your computer and use it in GitHub Desktop.
Benchmark for Aeron messaging with variable numbers of channels
import uk.co.real_logic.aeron.Aeron;
import uk.co.real_logic.aeron.Publication;
import uk.co.real_logic.aeron.Subscription;
import uk.co.real_logic.aeron.driver.MediaDriver;
import uk.co.real_logic.aeron.driver.ThreadingMode;
import uk.co.real_logic.aeron.logbuffer.FragmentHandler;
import uk.co.real_logic.aeron.logbuffer.Header;
import uk.co.real_logic.agrona.DirectBuffer;
import uk.co.real_logic.agrona.concurrent.BusySpinIdleStrategy;
import uk.co.real_logic.agrona.concurrent.IdleStrategy;
import uk.co.real_logic.agrona.concurrent.UnsafeBuffer;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
/**
* Timings on a quad-core 2.3 Ghz i7 Macbook Pro:
*
* 1 stream: 10.1M messages / second
* 10 streams: 6.1M messages / second
* 50 streams: 3.1M messages / second
* 100 streams: 1.5M messages / second
*
*
* @author Shilad Sen
*/
public class RawAeronBenchmark {
public static final int MESSAGE_BYTES = 40;
public static final int NUM_MESSAGES = 100000000;
public static final int NUM_STREAMS = 1;
public static final String PUB_URL = "udp://localhost:30114";
private static MediaDriver driver;
private static Aeron aeron;
public static void main(String args[]) throws InterruptedException, IOException {
MediaDriver.loadPropertiesFile("aeron-throughput.properties");
MediaDriver.Context driverContext = new MediaDriver.Context();
driverContext.threadingMode(ThreadingMode.DEDICATED);
driver = MediaDriver.launchEmbedded(driverContext);
Aeron.Context aeronContext = new Aeron.Context();
aeronContext.aeronDirectoryName(driver.aeronDirectoryName());
aeron = Aeron.connect(aeronContext);
Thread t1 = new Thread() {
public void run() {
try { runSubscriber(); } catch (InterruptedException e) {}
}
};
Thread t2 = new Thread() {
public void run() {
try { runPublisher(); } catch (InterruptedException e) {}
}
};
t1.start();
t2.start();
t1.join();
t2.join();
aeron.close();
driver.close();
}
public static void runSubscriber() throws InterruptedException {
Subscription subs[] = new Subscription[NUM_STREAMS];
final AtomicLong numReads = new AtomicLong();
FragmentHandler handler = new FragmentHandler() {
@Override
public void onFragment(DirectBuffer buffer, int offset, int length, Header header) {
long n = numReads.getAndIncrement();
if (n == 0) {
}
if (n % 1000000 == 0) {
System.err.println("read message " + n);
}
}
};
for (int i = 0; i < NUM_STREAMS; i++) {
subs[i] = aeron.addSubscription(PUB_URL, i);
}
long startTime = System.currentTimeMillis();
IdleStrategy idler = new BusySpinIdleStrategy();
while (numReads.get() < NUM_MESSAGES) {
int n = 0;
for (int i = 0; i < NUM_STREAMS; i++) {
n += subs[i].poll(handler, 1000);
}
idler.idle(n);
}
System.err.println("Elapsed milliseconds for " + NUM_MESSAGES + " messages is " + (System.currentTimeMillis() - startTime));
}
public static void runPublisher() throws InterruptedException {
Publication pubs[] = new Publication[NUM_STREAMS];
for (int i = 0; i < NUM_STREAMS; i++) {
pubs[i] = aeron.addPublication(PUB_URL, i);
}
Random rand = new Random();
long attempts = 0;
long writes = 0;
UnsafeBuffer buff = new UnsafeBuffer(new byte[MESSAGE_BYTES]);
for (int i = 0; i < NUM_MESSAGES; i++) {
Publication p = pubs[rand.nextInt(NUM_STREAMS)];
do {
attempts++;
} while (p.offer(buff)< 0);
writes++;
if (writes % 1000000 == 0) {
System.err.println("wrote message " + writes + " in " + attempts + " attempts");
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment