Skip to content

Instantly share code, notes, and snippets.

@americanstone
Created October 17, 2017 15:47
Show Gist options
  • Save americanstone/44b81ff59195e6093dcf6f4a3e1e24f6 to your computer and use it in GitHub Desktop.
Save americanstone/44b81ff59195e6093dcf6f4a3e1e24f6 to your computer and use it in GitHub Desktop.
A recursive resultless ForkJoinTask. The BiFunction knows how to take a list of feed and context to generate a ordered results. context is shared by all child thread. caller need to make sure the context is thread safe
package com.dealer.modules.cms.promotions.services;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.List;
import java.util.concurrent.RecursiveAction;
import java.util.function.BiFunction;
/***
* A recursive resultless ForkJoinTask. The BiFunction knows how to take a list of feed and context to generate a ordered results.
* context is shared by all child thread. caller need to make sure the context is thread safe
* there is example to demonstrate the usage in PromotionsService.
* @param <T> the type of the first argument to the function
* @param <U> the type of the second argument to the function
* @param <R> the type of the result of the function
*/
public class OrderedParallelLoading<T, U, R> extends RecursiveAction {
private static Log LOG = LogFactory.getLog(OrderedParallelLoading.class);
private List<T> feeds;
private U context;
private R[] results;
private BiFunction<T,U, R> fn;
private int start;
private int end;
private int threshold;
OrderedParallelLoading(List<T> feeds, U context, BiFunction<T, U, R> fn, R[] results, int threshold){
this.feeds = feeds;
this.context = context;
this.fn = fn;
this.results = results;
this.start = 0;
this.end = feeds.size();
this.threshold = threshold == 0 ? 5 : threshold;
}
OrderedParallelLoading(List<T> feeds, U context, BiFunction<T, U, R> fn, int start, int end, R[] results, int threshold) {
this.feeds = feeds;
this.context = context;
this.fn = fn;
this.start = start;
this.end = end;
this.results = results;
this.threshold = threshold;
}
@Override
protected void compute(){
int length = end - start;
if(length <= threshold ){
// do the actual work based on threshold default is 5
task(feeds.subList(start, end), start, end);
}else{
// divide and conquer to small chunk
OrderedParallelLoading leftTask = new OrderedParallelLoading(feeds, context, fn, start, start + length/2, results, threshold);
leftTask.fork();
OrderedParallelLoading rightTask = new OrderedParallelLoading(feeds, context, fn, start + length/2, end, results, threshold);
rightTask.compute();
leftTask.join();
}
}
private void task(List<T> feeds, int start, int end){
for(T feed : feeds){
try {
R result = fn.apply(feed, context); // be careful context is shared by all thread!
if(result != null){
// thread safe and keep the order
results[start] = result;
start++;
}
}catch(Exception e){
LOG.error(e);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment