Skip to content

Instantly share code, notes, and snippets.

@DarkSeraphim
Last active June 7, 2021 16:01
Show Gist options
  • Save DarkSeraphim/a5d0aeb91d94cf47e91905cd60eaa768 to your computer and use it in GitHub Desktop.
Save DarkSeraphim/a5d0aeb91d94cf47e91905cd60eaa768 to your computer and use it in GitHub Desktop.
lock-free, eventually consistent Listener queues, with concurrent adds, iteration and removes. (CAUTION: definitely not tested properly)
interface Listener<T> {
boolean isActive(); // active, default true
void remove(); // active = false
void fire(T event);
}
private final AtomicInteger next = new AtomicInteger(0);
private final AtomicInteger listenerCount = new AtomicInteger(0);
private final AtomicInteger removed = new AtomicInteger(0);
private volatile Listener<T>[] listeners = createArray(16);
private void waitForResize() {
while (!isResizing.get()) {
Thread.onSpinWait();
}
}
public AutoClosable add(Listener<T> listener) {
int index = next.getAndIncrement();
while (index >= listeners.length) {
// TODO: we need to ensure this doesn't trigger again right after a resize
waitForResize();
if (index >= listeners.length && isResizing.swap(false, true)) {
int actualLength = pending.length - removed.get(); // Can do this lazily I think
pendingBuffer = createArray(actualLength * 2); // TODO: shortcut when removed is actually big enough to account for the future allocation
while (listenerCount.get() != listener.length) {
// Wait for all adds within capacity to finish
// corollary: all active and incoming adds will be trapped in the while loop, we can reset their count at the end of the while
Thread.onSpinWait();
}
int ptr = 0;
for (var listener : listeners) {
if (listener != null && listener.isActive()) {
pendingBuffer[ptr] = listener;
ptr++; // move ptr to next
}
}
listeners = pendingBuffer;
next.set(ptr); // Move back ptr
listenerCount.set(ptr); // Does this work?
removed.set(0); // Reset removed counter
isResizing.set(false);
} else {
waitForResize();
}
index = next.getAndIncrement(); // Start loop again
}
listeners[index] = listener;
listenerCount.increment();
return listener::remove;
}
private void remove(Listener<T> listener) {
waitForResize(); // Don't allow removals _while_ we're resizing
listener.remove();
removed.increment();
}
void fire(T event) {
for (Listener<T> listener : listeners) {
listener.fire(event);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment