Skip to content

Instantly share code, notes, and snippets.

@gresrun
Last active January 13, 2021 18:15
Show Gist options
  • Save gresrun/c5a2479c3f6287ccb799 to your computer and use it in GitHub Desktop.
Save gresrun/c5a2479c3f6287ccb799 to your computer and use it in GitHub Desktop.
Simple Micro-benchmark to Test Jesque Job Throughput
package net.greghaines.jesque.perftest;
import static net.greghaines.jesque.utils.JesqueUtils.entry;
import static net.greghaines.jesque.utils.JesqueUtils.map;
import static net.greghaines.jesque.worker.WorkerEvent.JOB_SUCCESS;
import static net.greghaines.jesque.worker.WorkerEvent.WORKER_START;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import net.greghaines.jesque.Config;
import net.greghaines.jesque.ConfigBuilder;
import net.greghaines.jesque.Job;
import net.greghaines.jesque.client.Client;
import net.greghaines.jesque.client.ClientImpl;
import net.greghaines.jesque.worker.MapBasedJobFactory;
import net.greghaines.jesque.worker.Worker;
import net.greghaines.jesque.worker.WorkerEvent;
import net.greghaines.jesque.worker.WorkerImpl;
import net.greghaines.jesque.worker.WorkerListener;
import net.greghaines.jesque.worker.WorkerPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MultiPerfTest {
private static final Logger LOG = LoggerFactory.getLogger(MultiPerfTest.class);
private static final Config CONFIG = new ConfigBuilder().build();
private static final String TEST_QUEUE = "foo";
private static final int JOB_COUNT = 100000;
private static final int NUM_ENQUEUERS = 1;
private static final int NUM_WORKERS = 8;
public static void main(final String... args) {
LOG.info("Starting test...");
final WorkerPool workerPool = new WorkerPool(new Callable<Worker>() {
@Override
public Worker call() {
return new WorkerImpl(CONFIG, Arrays.asList(TEST_QUEUE),
new MapBasedJobFactory(map(entry("TestAction", TestAction.class))));
}
}, NUM_WORKERS);
workerPool.getWorkerEventEmitter().addListener(new TimerWorkerListener(workerPool),
WORKER_START, JOB_SUCCESS);
new Enqueuer().run();
LOG.info("Enqueue complete!");
workerPool.run();
try {
workerPool.join(JOB_COUNT * 100);
} catch (Exception e) {
LOG.warn("Exception while waiting for workerPool to join", e);
}
LOG.info("Test complete!");
}
private static class Enqueuer implements Runnable {
@Override
public void run() {
final Client client = new ClientImpl(CONFIG);
try {
final Job job = new Job("TestAction",
new Object[] { 1, 2.3, true, "test", Arrays.asList("inner", 4.5) });
for (int i = 0; i < JOB_COUNT; i++) {
client.enqueue(TEST_QUEUE, job);
}
} finally {
client.end();
}
}
}
private static class TimerWorkerListener implements WorkerListener {
private final AtomicLong startTime = new AtomicLong(0L);
private final AtomicLong stopTime = new AtomicLong(0L);
private final AtomicInteger countdown = new AtomicInteger(JOB_COUNT * NUM_ENQUEUERS);
private final WorkerPool workerPool;
public TimerWorkerListener(final WorkerPool workerPool) {
this.workerPool = workerPool;
}
@Override
public void onEvent(final WorkerEvent event, final Worker worker, final String queue, final Job job,
final Object runner, final Object result, final Throwable ex) {
if (event == WORKER_START && this.startTime.compareAndSet(0L, System.currentTimeMillis())) {
LOG.info("Started the clock...");
}
if (event == JOB_SUCCESS && this.countdown.decrementAndGet() <= 0
&& this.stopTime.compareAndSet(0L, System.currentTimeMillis())) {
final long timeDiff = this.stopTime.get() - this.startTime.get();
LOG.info("Completed {} jobs in {}ms - Avg. {} jobs/sec",
new Object[] { JOB_COUNT, timeDiff, (JOB_COUNT * 1000.0 / timeDiff) });
this.workerPool.end(false);
}
}
}
public static class TestAction implements Runnable {
private final Integer i;
private final Double d;
private final Boolean b;
private final String s;
private final List<Object> l;
public TestAction(final Integer i, final Double d, final Boolean b, final String s, final List<Object> l) {
this.i = i;
this.d = d;
this.b = b;
this.s = s;
this.l = l;
}
@Override
public void run() {
LOG.debug("TestAction.run() {} {} {} {} {}", new Object[] { this.i, this.d, this.b, this.s, this.l });
}
}
}
package net.greghaines.jesque.perftest;
import static net.greghaines.jesque.utils.JesqueUtils.entry;
import static net.greghaines.jesque.utils.JesqueUtils.map;
import static net.greghaines.jesque.worker.WorkerEvent.JOB_SUCCESS;
import static net.greghaines.jesque.worker.WorkerEvent.WORKER_START;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import net.greghaines.jesque.Config;
import net.greghaines.jesque.ConfigBuilder;
import net.greghaines.jesque.Job;
import net.greghaines.jesque.client.Client;
import net.greghaines.jesque.client.ClientImpl;
import net.greghaines.jesque.worker.MapBasedJobFactory;
import net.greghaines.jesque.worker.Worker;
import net.greghaines.jesque.worker.WorkerEvent;
import net.greghaines.jesque.worker.WorkerImpl;
import net.greghaines.jesque.worker.WorkerListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PerfTest {
private static final Logger LOG = LoggerFactory.getLogger(PerfTest.class);
private static final Config CONFIG = new ConfigBuilder().build();
private static final String TEST_QUEUE = "foo";
private static final int JOB_COUNT = 100000;
private static final int NUM_ENQUEUERS = 1;
public static void main(final String... args) {
LOG.info("Starting test...");
final Worker worker = new WorkerImpl(CONFIG, Arrays.asList(TEST_QUEUE),
new MapBasedJobFactory(map(entry("TestAction", TestAction.class))));
final TimerWorkerListener timer = new TimerWorkerListener();
worker.getWorkerEventEmitter().addListener(timer, WORKER_START, JOB_SUCCESS);
final Thread workerThread = new Thread(worker);
new Enqueuer().run();
LOG.info("Enqueue complete!");
workerThread.start();
try {
workerThread.join();
} catch (Exception e) {
LOG.warn("Exception while waiting for workerThread to join", e);
}
LOG.info("Test complete!");
}
private static class Enqueuer implements Runnable {
@Override
public void run() {
final Client client = new ClientImpl(CONFIG);
try {
final Job job = new Job("TestAction",
new Object[] { 1, 2.3, true, "test", Arrays.asList("inner", 4.5) });
for (int i = 0; i < JOB_COUNT; i++) {
client.enqueue(TEST_QUEUE, job);
}
} finally {
client.end();
}
}
}
private static class TimerWorkerListener implements WorkerListener {
private final AtomicLong startTime = new AtomicLong(0L);
private final AtomicLong stopTime = new AtomicLong(0L);
private final AtomicInteger countdown = new AtomicInteger(JOB_COUNT * NUM_ENQUEUERS);
@Override
public void onEvent(final WorkerEvent event, final Worker worker, final String queue, final Job job,
final Object runner, final Object result, final Throwable ex) {
if (event == WORKER_START && this.startTime.compareAndSet(0L, System.currentTimeMillis())) {
LOG.info("Started the clock...");
}
if (event == JOB_SUCCESS && this.countdown.decrementAndGet() <= 0
&& this.stopTime.compareAndSet(0L, System.currentTimeMillis())) {
final long timeDiff = this.stopTime.get() - this.startTime.get();
LOG.info("Completed {} jobs in {}ms - Avg. {} jobs/sec",
new Object[] { JOB_COUNT, timeDiff, (JOB_COUNT * 1000.0 / timeDiff) });
worker.end(false);
}
}
}
public static class TestAction implements Runnable {
private final Integer i;
private final Double d;
private final Boolean b;
private final String s;
private final List<Object> l;
public TestAction(final Integer i, final Double d, final Boolean b, final String s, final List<Object> l) {
this.i = i;
this.d = d;
this.b = b;
this.s = s;
this.l = l;
}
@Override
public void run() {
LOG.debug("TestAction.run() {} {} {} {} {}", new Object[] { this.i, this.d, this.b, this.s, this.l });
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment