Skip to content

Instantly share code, notes, and snippets.

@ennerf
Created January 10, 2023 22:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ennerf/d78eab936d9f4f53dca9ef9170d9baa1 to your computer and use it in GitHub Desktop.
Save ennerf/d78eab936d9f4f53dca9ef9170d9baa1 to your computer and use it in GitHub Desktop.
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventPoller;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import javafx.animation.AnimationTimer;
/**
* Uses a JavaFX AnimationTimer that gets called once per frame to poll
* events from a Disruptor. This automatically batches events down to the
* frame rate and can remove other synchronization code for getting data
* into the FX thread.
*
* @author Florian Enner
* @since 24 Feb 2021
*/
public class DisruptorFx {
public static void example() {
var disruptor = new Disruptor<>(FeedbackEvent::new, 1024, DaemonThreadFactory.INSTANCE);
// consume on the FXAT
var poller = DisruptorFx.newFxPoller(disruptor, ((event, sequence, endOfBatch) -> {
// render something
System.out.println(event.position);
}));
// start
disruptor.start();
poller.start();
// produce data on some other thread
var buffer = disruptor.getRingBuffer();
for (int i = 0; i < 100; i++) {
long sequence = buffer.next();
var event = buffer.get(sequence);
event.position = i;
buffer.publish(sequence);
}
}
static class FeedbackEvent {
double position = Double.NaN;
}
public static <T> FxPoller<T> newFxPoller(Disruptor<T> disruptor, EventHandler<T> handler) {
return newFxPoller(disruptor, (event, sequence, endOfBatch) -> {
handler.onEvent(event, sequence, endOfBatch);
return true; // false would exit the batch early (e.g. timeout or to avoid blocking a writer)
});
}
public static <T> FxPoller<T> newFxPoller(Disruptor<T> disruptor, EventPoller.Handler<T> handler) {
return new FxPoller<>(disruptor.getRingBuffer().newPoller(), handler);
}
static class FxPoller<T> extends AnimationTimer {
FxPoller(EventPoller<T> poller, EventPoller.Handler<T> handler) {
this.poller = poller;
this.handler = handler;
}
final EventPoller<T> poller;
final EventPoller.Handler<T> handler;
@Override
public void handle(long now) {
try {
// A single call takes all currently available sequences and
// processes them in a batch. Handlers may return early by
// returning false.
poller.poll(handler);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment