Skip to content

Instantly share code, notes, and snippets.

@pettermahlen
Created January 30, 2019 10:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pettermahlen/a0751fd9ec745ea2c5a0f219fef70b62 to your computer and use it in GitHub Desktop.
Save pettermahlen/a0751fd9ec745ea2c5a0f219fef70b62 to your computer and use it in GitHub Desktop.
Repro
@Test
public void shouldDisposeMultiThreadedEventSourceSafely() throws Exception {
// event source that just pushes stuff every X ms on a thread.
RecurringEventSource source = new RecurringEventSource();
final Builder<String, TestEvent, TestEffect> builder = Mobius
.loop(update, effectHandler).eventSource(source);
Random random = new Random();
for (int i = 0 ; i < 100 ; i++) {
mobiusLoop = builder.startFrom("foo");
Thread.sleep(random.nextInt(30));
mobiusLoop.dispose();
}
}
private static class RecurringEventSource implements EventSource<TestEvent> {
final SettableFuture<Void> completion = SettableFuture.create();
@Nonnull
@Override
public Disposable subscribe(Consumer<TestEvent> eventConsumer) {
if (completion.isDone()) {
try {
completion.get(); // should throw since the only way it can complete is exceptionally
} catch (InterruptedException|ExecutionException e) {
throw new RuntimeException("handle this", e);
}
}
final Generator generator = new Generator(eventConsumer);
new Thread(generator).start();
return () -> {
generator.generate = false;
};
}
private class Generator implements Runnable {
private volatile boolean generate = true;
private final Consumer<TestEvent> consumer;
private Generator(
Consumer<TestEvent> consumer) {
this.consumer = consumer;
}
@Override
public void run() {
while (generate) {
try {
consumer.accept(new TestEvent("hi"));
Thread.sleep(15);
} catch (Exception e) {
completion.setException(e);
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment