Skip to content

Instantly share code, notes, and snippets.

@grondag
Last active February 25, 2019 20:45
Show Gist options
  • Save grondag/e35afcfbf7dbe388f03eacc1ca467c2f to your computer and use it in GitHub Desktop.
Save grondag/e35afcfbf7dbe388f03eacc1ca467c2f to your computer and use it in GitHub Desktop.
Server-Side Concurrency API - Sketch
public interface SimulationTickable
{
/**
* If true, then {@link #doOnTick(int)} will be called during
* world tick from server thread. Is generally only checked
* at setup so result should not be dynamic.
*/
public default boolean doesUpdateOnTick() { return false; }
/**
* See {@link #doesUpdateOnTick()}
*/
public default void doOnTick() {}
/**
* If true, then {@link #doOffTick(int)} will be called once per server tick
* from simulation thread pool. Is generally only checked
* at setup so result should not be dynamic.
*/
public default boolean doesUpdateOffTick() { return false; }
/**
* See {@link #doesUpdateOffTick()}
*/
public default void doOffTick(ScatterGatherThreadPool pool) {}
}
/**
* Thread pool optimized for scatter-gather processing patterns with an array, list or
* other linearly-addressable data structure. Performance is equivalent to a Java fork-join
* pool for large work loads and seems to have somewhat lower overhead and lower latency for small batches.<p>
*
* The main motivation is simplicity: it is much easier (for the author at least) to understand and debug
* than a custom counted-completer fork join task. (Based on actual experience creating same.)
* It's also easier to use and requires less code for its intended use cases. <p>
*
* The pool does not have a queue, and all calls to the various flavors of completeTask() are blocking.
* This design is consistent with the scatter-gather patterns for which this pool is used - the intention
* is to complete the entire task <em>now</em>, as quickly as possible, and then move on with another
* task that may depend on those results.<p>
*
* Calls into the pool for execution are not thread-safe! (Again, no queue - it can only do one thing at a time.)
* While usage could be externally synchronized, the intended usage pattern is to call into the pool
* from a consumer thread that generates tasks dynamically and/or drain a queue of tasks into the pool.<p>
*
* Size of the pool is always the system parallelism level, less one, because the calling thread is
* recruited to do some of the work.<p>
*
* Without a queue, there is no work stealing, however tasks are apportioned incrementally, with worker threads
* claiming work only as they get scheduled. Generally it will not be worthwhile to use the pool
* unless the submitted tasks have enough tasks to keep all threads occupied. Some execution methods include
* concurrency thresholds that, if not met, will simply execute the entire task on the calling thread so that
* special case logic isn't needed in the calling code.<p>
*
* A perfectly efficient pool would always have all threads finishing at the same time.
* Even with dynamic work assignment, some thread will always finish some finite amount of time after
* the other threads finish. This waste can be minimized by slicing the work into smaller batches but
* this comes with the price of increased overhead because shared state must be updated with each new batch.
* Some execution parameters can be used to tune the batch size for a particular work load.<p>
*
* Note this pool is <em>NOT</em> suitable as a generic thread pool for tasks that cannot be shared across
* multiple cores and/or that are meant to be completed asynchronously. For that, the common ForkJoin pool,
* a fixed thread pool, dedicated threads, will all be better.
*/
public interface ScatterGatherThreadPool {
/**
* Applies the given operation to every in-range element of the array. If the number of elements to be
* processed is less than the given concurrency threshold, the operations will happen on the calling thread.
* In either case, will block until all elements are processed.<p>
*
* Use larger batch sizes (and larger thresholds) for fast operations on many elements. Use smaller values
* for long-running elements.
*/
<V> void completeTask(V[] inputs, int startIndex, int count, int concurrencyThreshold, Consumer<V> operation, int batchSize);
<V> void completeTask(V[] inputs, int concurrencyThreshold, Consumer<V> operation, int batchSize);
<V> void completeTask(V[] inputs, int concurrencyThreshold, Consumer<V> operation);
<V> void completeTask(V[] inputs, Consumer<V> operation, int batchSize);
<V> void completeTask(V[] inputs, Consumer<V> operation);
<V> void completeTask(V[] inputs, int startIndex, int count, Consumer<V> operation, int batchSize);
<V> void completeTask(V[] inputs, int startIndex, int count, Consumer<V> operation);
<V> void completeTask(V[] inputs, int startIndex, int count, int concurrencyThreshold, Consumer<V> operation);
/**
* Like {@link #completeTask(Object[], int, int, int, Consumer, int)} but with a mapping consumer.
*/
<T, V> void completeTask(final T[] inputs, final int startIndex, final int count, final int concurrencyThreshold, final ArrayMappingConsumer<T,V> operation, int batchSize);
<T, V> void completeTask(T[] inputs, int concurrencyThreshold, final ArrayMappingConsumer<T,V> operation, int batchSize);
<T, V> void completeTask(T[] inputs, int concurrencyThreshold, final ArrayMappingConsumer<T,V> operation);
<T, V> void completeTask(T[] inputs, final ArrayMappingConsumer<T,V> operation, int batchSize);
<T, V> void completeTask(T[] inputs, final ArrayMappingConsumer<T,V> operation);
<T, V> void completeTask(T[] inputs, int startIndex, int count, final ArrayMappingConsumer<T,V>operation, int batchSize);
<T, V> void completeTask(T[] inputs, int startIndex, int count, final ArrayMappingConsumer<T,V>operation);
<T, V> void completeTask(T[] inputs, int startIndex, int count, int concurrencyThreshold, ArrayMappingConsumer<T,V>operation);
/**
* Process a specialized task. Will always attempt to use the pool because no information is
* provided that would allow evaluation of fitness for concurrency. Blocks until all done.
*/
void completeTask(SharableTask task);
/**
* Convenient when your work happens to be in a SimpleConcurrentList
*/
<V> void completeTask(SimpleConcurrentList<V> list, Consumer<V> operation);
<V> void completeTask(SimpleConcurrentList<V> list, int concurrencyThreshold, Consumer<V> operation);
}
/**
* Similar to a Collector in a Java stream - accumulates results from the mapping function in each thread
* and then dumps them into a collector after all batches are completed.<p>
*
* The right half of the BiConsumer (another consumer) provides access to the in-thread sink for map outputs.
* It's not represented as a map function in order to support functions that might not be 1:1 maps.
*/
public class ArrayMappingConsumer<T,V>
{
private final BiConsumer<T, Consumer<V>> operation;
private final Consumer<AbstractUnorderedArrayList<V>> collector;
protected final ThreadLocal<WorkerState> workerStates = new ThreadLocal<WorkerState>()
{
@Override
protected ArrayMappingConsumer<T, V>.WorkerState initialValue()
{
return new WorkerState();
}
};
/**
* For custom collectors - the collector provided must accept a SimpleUnorderedArrayList and will be
* called in each thread where work as done after all batches are complete. <p>
*
* The collector MUST be thread safe.
*/
public ArrayMappingConsumer(BiConsumer<T, Consumer<V>> operation, Consumer<AbstractUnorderedArrayList<V>> collector)
{
this.operation = operation;
this.collector = collector;
}
/**
* The easy way - provide a simple concurrent list a the collector. Note that
* this implementation does not clear the list between runs. If a consumer is reused, this
* will need to be handled externally if necessary.
*/
public ArrayMappingConsumer(BiConsumer<T, Consumer<V>> operation, SimpleConcurrentList<V> target)
{
this.operation = operation;
this.collector = (r) -> {if(!r.isEmpty()) target.addAll(r);};
}
/**
* Holds the per-thread results and provides access to the mapping function.
*/
private class WorkerState extends AbstractUnorderedArrayList<V> implements Consumer<T>
{
@Override
public final void accept(@SuppressWarnings("null") T t)
{
operation.accept(t, v -> this.add(v));
}
/**
* Called in each thread after all batches (for that thread) are complete.
*/
protected final void completeThread()
{
collector.accept(this);
this.clear();
}
}
/**
* Gets the mapping function for this thread. Using the function will collect output
* in the calling thread for later consolidation via {@link #completeThread()}
*/
protected final Consumer<T> getWorkerConsumer()
{
return workerStates.get();
}
/**
* Signals worker state to perform result consolidation for this thread.
*/
protected final void completeThread()
{
workerStates.get().completeThread();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment