Skip to content

Instantly share code, notes, and snippets.

@mikehearn
Last active November 18, 2020 09:26
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mikehearn/4781ce7f00228762adfb to your computer and use it in GitHub Desktop.
Save mikehearn/4781ce7f00228762adfb to your computer and use it in GitHub Desktop.
Some code for JavaFX observable collections (maps and sets) which replicate changes between threads. From the open source, Apache licensed Lighthouse project. Check there for the latest code.
// Contact: hearn@vinumeris.com
package lighthouse.threading;
import com.google.common.util.concurrent.Uninterruptibles;
import javafx.application.Platform;
import lighthouse.protocol.LHUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import static com.google.common.base.Preconditions.checkState;
import static lighthouse.protocol.LHUtils.checkedGet;
/** An extended executor interface that supports thread affinity assertions and short circuiting. */
public interface AffinityExecutor extends Executor {
/** Returns true if the current thread is equal to the thread this executor is backed by. */
public boolean isOnThread();
/** Throws an IllegalStateException if the current thread is equal to the thread this executor is backed by. */
public void checkOnThread();
/** If isOnThread() then runnable is invoked immediately, otherwise the closure is queued onto the backing thread. */
public void executeASAP(LHUtils.UncheckedRunnable runnable);
/**
* Runs the given function on the executor, blocking until the result is available. Be careful not to deadlock this
* way! Make sure the executor can't possibly be waiting for the calling thread.
*/
public default <T> T fetchFrom(Supplier<T> fetcher) {
if (isOnThread())
return fetcher.get();
else
return checkedGet(CompletableFuture.supplyAsync(fetcher, this));
}
public abstract static class BaseAffinityExecutor implements AffinityExecutor {
protected final Thread.UncaughtExceptionHandler exceptionHandler;
protected BaseAffinityExecutor() {
exceptionHandler = Thread.currentThread().getUncaughtExceptionHandler();
}
@Override
public abstract boolean isOnThread();
@Override
public void checkOnThread() {
checkState(isOnThread(), "On wrong thread: %s", Thread.currentThread());
}
@Override
public void executeASAP(LHUtils.UncheckedRunnable runnable) {
final Runnable command = () -> {
try {
runnable.run();
} catch (Throwable throwable) {
exceptionHandler.uncaughtException(Thread.currentThread(), throwable);
}
};
if (isOnThread())
command.run();
else {
execute(command);
}
}
// Must comply with the Executor definition w.r.t. exceptions here.
@Override
public abstract void execute(Runnable command);
}
public static AffinityExecutor UI_THREAD = new BaseAffinityExecutor() {
@Override
public boolean isOnThread() {
return Platform.isFxApplicationThread();
}
@Override
public void execute(Runnable command) {
Platform.runLater(command);
}
};
public static AffinityExecutor SAME_THREAD = new BaseAffinityExecutor() {
@Override
public boolean isOnThread() {
return true;
}
@Override
public void execute(Runnable command) {
command.run();
}
};
public static class ServiceAffinityExecutor extends BaseAffinityExecutor {
private static final Logger log = LoggerFactory.getLogger(ServiceAffinityExecutor.class);
protected AtomicReference<Thread> whichThread = new AtomicReference<>(null);
private final Thread.UncaughtExceptionHandler handler = Thread.currentThread().getUncaughtExceptionHandler();
public final ScheduledThreadPoolExecutor service;
public ServiceAffinityExecutor(String threadName) {
service = new ScheduledThreadPoolExecutor(1, runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
thread.setName(threadName);
whichThread.set(thread);
return thread;
}, (runnable, executor) -> {
log.warn("Ignored execution attempt due to shutdown: {}", runnable);
});
}
@Override
public boolean isOnThread() {
return Thread.currentThread() == whichThread.get();
}
@Override
public void execute(Runnable command) {
service.execute(() -> {
try {
command.run();
} catch (Throwable e) {
if (handler != null)
handler.uncaughtException(Thread.currentThread(), e);
else
e.printStackTrace();
}
});
}
}
/**
* An executor useful for unit tests: allows the current thread to block until a command arrives from another
* thread, which is then executed. Inbound closures/commands stack up until they are cleared by looping.
*/
public static class Gate extends BaseAffinityExecutor {
private final Thread thisThread = Thread.currentThread();
private final LinkedBlockingQueue<Runnable> commandQ = new LinkedBlockingQueue<>();
private final boolean alwaysQueue;
public Gate() {
this(false);
}
/** If alwaysQueue is true, executeASAP will never short-circuit and will always queue up. */
public Gate(boolean alwaysQueue) {
this.alwaysQueue = alwaysQueue;
}
@Override
public boolean isOnThread() {
return !alwaysQueue && Thread.currentThread() == thisThread;
}
@Override
public void execute(Runnable command) {
Uninterruptibles.putUninterruptibly(commandQ, command);
}
public void waitAndRun() {
final Runnable runnable = Uninterruptibles.takeUninterruptibly(commandQ);
runnable.run();
}
public int getTaskQueueSize() {
return commandQ.size();
}
}
}
// Contact: hearn@vinumeris.com
public class LHUtils {
private static final Logger log = LoggerFactory.getLogger(LHUtils.class);
public static List<Path> listDir(Path dir) throws IOException {
List<Path> contents = new LinkedList<>();
try (Stream<Path> list = Files.list(dir)) {
list.forEach(contents::add);
}
return contents;
}
//region Generic Java 8 enhancements
public interface UncheckedRun<T> {
public T run() throws Throwable;
}
public interface UncheckedRunnable {
public void run() throws Throwable;
}
public static <T> T unchecked(UncheckedRun<T> run) {
try {
return run.run();
} catch (Throwable throwable) {
throw new RuntimeException(throwable);
}
}
public static void uncheck(UncheckedRunnable run) {
try {
run.run();
} catch (Throwable throwable) {
throw new RuntimeException(throwable);
}
}
public static void ignoreAndLog(UncheckedRunnable runnable) {
try {
runnable.run();
} catch (Throwable t) {
log.error("Ignoring error", t);
}
}
public static <T> T ignoredAndLogged(UncheckedRun<T> runnable) {
try {
return runnable.run();
} catch (Throwable t) {
log.error("Ignoring error", t);
return null;
}
}
@SuppressWarnings("unchecked")
public static <T, E extends Throwable> T checkedGet(Future<T> future) throws E {
try {
return future.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw (E) e.getCause();
}
}
public static boolean didThrow(UncheckedRun run) {
try {
run.run();
return false;
} catch (Throwable throwable) {
return true;
}
}
public static boolean didThrow(UncheckedRunnable run) {
try {
run.run();
return false;
} catch (Throwable throwable) {
return true;
}
}
public static <T> T stopwatched(String description, UncheckedRun<T> run) {
long now = System.currentTimeMillis();
T result = unchecked(run::run);
log.info("{}: {}ms", description, System.currentTimeMillis() - now);
return result;
}
public static void stopwatch(String description, UncheckedRunnable run) {
long now = System.currentTimeMillis();
uncheck(run::run);
log.info("{}: {}ms", description, System.currentTimeMillis() - now);
}
//endregion
}
// Contact: hearn@vinumeris.com
package lighthouse.threading;
import javafx.beans.InvalidationListener;
import javafx.beans.Observable;
import javafx.collections.*;
import java.util.concurrent.Executor;
/**
* An attempt to make multi-threading and observable/reactive UI programming work together inside JavaFX without too
* many headaches. This class allows you to register change listeners on the target Observable which will be
* run with the given {@link java.util.concurrent.Executor}. In this way an observable collection which is updated by
* one thread can be observed from another thread without needing to use explicit locks or explicit marshalling.
*/
public class MarshallingObservers {
public static InvalidationListener addListener(Observable observable, InvalidationListener listener, Executor executor) {
InvalidationListener l = x -> executor.execute(() -> listener.invalidated(x));
observable.addListener(l);
return l;
}
public static <T> ListChangeListener<T> addListener(ObservableList<T> observable, ListChangeListener<T> listener, Executor executor) {
ListChangeListener<T> l = (ListChangeListener.Change<? extends T> c) -> executor.execute(() -> {
// Change objects are not thread safe. They may be reused by listeners following this one. However,
// we cheat here and exploit knowledge of the implementation: a change is basically immutable and
// self contained except for the iteration state. So we synchronize on the change and reset it at the
// start to ensure we can iterate over it safely. Note that set changes actually are immutable and
// so don't need this.
synchronized (c) {
c.reset();
listener.onChanged(c);
}
});
observable.addListener(l);
return l;
}
public static <T> SetChangeListener<T> addListener(ObservableSet<T> observable, SetChangeListener<T> listener, Executor executor) {
SetChangeListener<T> l = (SetChangeListener.Change<? extends T> c) -> executor.execute(() -> listener.onChanged(c));
observable.addListener(l);
return l;
}
public static <K, V> MapChangeListener<K, V> addListener(ObservableMap<K, V> observable, MapChangeListener<K, V> listener, Executor executor) {
MapChangeListener<K, V> l = (MapChangeListener.Change<? extends K, ? extends V> c) -> executor.execute(() -> listener.onChanged(c));
observable.addListener(l);
return l;
}
}
// Contact: hearn@vinumeris.com
package lighthouse.threading;
import javafx.beans.WeakListener;
import javafx.collections.*;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
/**
* Utility functions that mirror changes from one list into another list. JavaFX already provides this functionality
* of course under the name "content binding"; a mirror is a content binding that relays changes into other threads
* first. Thus you can have an ObservableList which is updated in one thread, but still bound to directly in the UI
* thread, without needing to worry about cross-thread interference.
*/
public class ObservableMirrors {
/**
* Creates an unmodifiable list that asynchronously follows changes in mirrored, with changes applied using
* the given executor. This should only be called on the thread that owns the list to be mirrored, as the contents
* will be read.
*/
public static <T> ObservableList<T> mirrorList(ObservableList<T> mirrored, AffinityExecutor runChangesIn) {
ObservableList<T> result = FXCollections.observableArrayList();
result.setAll(mirrored);
mirrored.addListener(new ListMirror<T>(result, runChangesIn));
return FXCollections.unmodifiableObservableList(result);
}
private static class ListMirror<E> implements ListChangeListener<E>, WeakListener {
private final WeakReference<ObservableList<E>> targetList;
private final AffinityExecutor runChangesIn;
public ListMirror(ObservableList<E> list, AffinityExecutor runChangesIn) {
this.targetList = new WeakReference<>(list);
this.runChangesIn = runChangesIn;
}
@Override
public void onChanged(Change<? extends E> change) {
final List<E> list = targetList.get();
if (list == null) {
change.getList().removeListener(this);
} else {
// If we're already in the right thread this will just run the change immediately, as per normal.
// Change objects are not thread safe. They may be reused by listeners following this one. However,
// we cheat here and exploit knowledge of the implementation: a change is basically immutable and
// self contained except for the iteration state. So we synchronize on the change and reset it at the
// start to ensure we can iterate over it safely. Note that set changes actually are immutable and
// so don't need this.
LinkedList<List<? extends E>> sublists = new LinkedList<>();
while (change.next()) {
if (change.wasPermutated()) {
sublists.add(new ArrayList<>(change.getList().subList(change.getFrom(), change.getTo())));
} else if (change.wasAdded()) {
sublists.add(new ArrayList<>(change.getAddedSubList()));
}
}
runChangesIn.executeASAP(() -> {
synchronized (change) {
change.reset();
while (change.next()) {
if (change.wasPermutated()) {
list.subList(change.getFrom(), change.getTo()).clear();
list.addAll(change.getFrom(), sublists.pollFirst());
} else {
if (change.wasRemoved()) {
list.subList(change.getFrom(), change.getFrom() + change.getRemovedSize()).clear();
}
if (change.wasAdded()) {
list.addAll(change.getFrom(), sublists.pollFirst());
}
}
}
}
});
}
}
@Override
public boolean wasGarbageCollected() {
return targetList.get() == null;
}
// Do we really need these?
@Override
public int hashCode() {
final List<E> list = targetList.get();
return (list == null)? 0 : list.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
final List<E> list1 = targetList.get();
if (list1 == null) {
return false;
}
if (obj instanceof ListMirror) {
final ListMirror<?> other = (ListMirror<?>) obj;
final List<?> list2 = other.targetList.get();
return list1 == list2;
}
return false;
}
}
public static <K, V> ObservableMap<K, V> mirrorMap(ObservableMap<K, V> mirrored, AffinityExecutor runChangesIn) {
ObservableMap<K, V> result = FXCollections.observableHashMap();
result.putAll(mirrored);
mirrored.addListener(new MapMirror<K, V>(result, runChangesIn));
return result;
}
private static class MapMirror<K, V> implements MapChangeListener<K, V>, WeakListener {
private final WeakReference<ObservableMap<K, V>> targetMap;
private final AffinityExecutor runChangesIn;
public MapMirror(ObservableMap<K, V> targetMap, AffinityExecutor runChangesIn) {
this.targetMap = new WeakReference<>(targetMap);
this.runChangesIn = runChangesIn;
}
@Override
public boolean wasGarbageCollected() {
return targetMap.get() == null;
}
@Override
public void onChanged(Change<? extends K, ? extends V> change) {
final ObservableMap<K, V> map = targetMap.get();
if (map == null) {
change.getMap().removeListener(this);
} else {
runChangesIn.executeASAP(() -> {
if (change.wasAdded()) {
map.put(change.getKey(), change.getValueAdded());
} else if (change.wasRemoved()) {
map.remove(change.getKey());
}
});
}
}
}
/**
* Creates an unmodifiable list that asynchronously follows changes in mirrored, with changes applied using
* the given executor. This should only be called on the thread that owns the list to be mirrored, as the contents
* will be read.
*/
public static <T> ObservableSet<T> mirrorSet(ObservableSet<T> mirrored, AffinityExecutor runChangesIn) {
@SuppressWarnings("unchecked") ObservableSet<T> result = FXCollections.observableSet();
result.addAll(mirrored);
mirrored.addListener(new SetMirror<T>(result, runChangesIn));
return FXCollections.unmodifiableObservableSet(result);
}
private static class SetMirror<E> implements SetChangeListener<E>, WeakListener {
private final WeakReference<ObservableSet<E>> targetSet;
private final AffinityExecutor runChangesIn;
public SetMirror(ObservableSet<E> set, AffinityExecutor runChangesIn) {
this.targetSet = new WeakReference<>(set);
this.runChangesIn = runChangesIn;
}
@Override
public void onChanged(final Change<? extends E> change) {
final ObservableSet<E> set = targetSet.get();
if (set == null) {
change.getSet().removeListener(this);
} else {
// If we're already in the right thread this will just run the change immediately, as per normal.
runChangesIn.executeASAP(() -> {
if (change.wasAdded())
set.add(change.getElementAdded());
if (change.wasRemoved())
set.remove(change.getElementRemoved());
});
}
}
@Override
public boolean wasGarbageCollected() {
return targetSet.get() == null;
}
@Override
public int hashCode() {
final ObservableSet<E> set = targetSet.get();
return (set == null)? 0 : set.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
final Set<E> set1 = targetSet.get();
if (set1 == null) {
return false;
}
if (obj instanceof SetMirror) {
final SetMirror<?> other = (SetMirror<?>) obj;
final Set<?> list2 = other.targetSet.get();
return set1 == list2;
}
return false;
}
}
}
@TomasMikula
Copy link

Hi Mike,

if I'm not missing something, in your ListMirror class, calls to

change.getList().subList(from, to)

and

change.getAddedSubList()

from the runChangesIn executor (i.e. generally from a different thread) will return wrong results or throw out-of-bounds exception in case the source list has been modified before executeASAP actually executed the action. Am I right?

@mikehearn
Copy link
Author

hey @TomasMikula sorry I didn't see this message before. Yes you were right. It's fixed in the current code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment