Skip to content

Instantly share code, notes, and snippets.

@dgomesbr
Last active August 29, 2015 14:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dgomesbr/5b9ab4087555c5502876 to your computer and use it in GitHub Desktop.
Save dgomesbr/5b9ab4087555c5502876 to your computer and use it in GitHub Desktop.
Example of parallel processing on ConcurrentChunkProcessor
/**
* same as SimpleChunkProcessor except otherwise shown
*/
public class ConcurrentChunkProcessor<I, O> implements ChunkProcessor<I>, InitializingBean {
// ...
// example of parallel processing using Executor service, better create this by Spring and inject if available or
// use the same TaskExecutor from the Context
private static final int THREADPOOL_TIMEOUT_IN_SECONDS = 1;
private static final int THREADPOOL_PROCESSOR_NUMBER = Runtime.getRuntime().availableProcessors() * 2;
private final ExecutorService executorService =
new ThreadPoolExecutor(
THREADPOOL_PROCESSOR_NUMBER, // core thread pool size
THREADPOOL_PROCESSOR_NUMBER, // maximum thread pool size
THREADPOOL_TIMEOUT_IN_SECONDS, // time to wait before resizing pool
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(THREADPOOL_PROCESSOR_NUMBER, true),
new ThreadPoolExecutor.CallerRunsPolicy());
//...
// The new transform method, on my case I don't need the output generated by the process / doProcess
// hence I'm doing iterator.remove without knowledge of the result of doProcess()
//
protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
Chunk<O> outputs = new Chunk<O>();
List<Future<O>> list = new ArrayList<Future<O>>();
int itemFromChunk = 0;
int chunkSize = inputs.getItems().size();
logger.info("Processing chunk[" + chunkSize + "]");
//add for future
for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
itemFromChunk++;
final I item = iterator.next();
Callable<O> transformTask = new TransformTask<O,I>(itemProcessor,listener,item);
Future<O> future = executorService.submit(transformTask);
list.add(future);
//removing it early since the future is only responsible to add the output back if it exists
iterator.remove();
}
/*
//retrieve executed (blocking)
itemFromChunk = 0;
for(Future<O> future : list){
itemFromChunk++;
O output = null;
try {
output = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (Exception e) {
inputs.clear();
throw e;
}
if (output != null) {
outputs.add(output);
}
}
*/
//dont shutdown just yet, remember to refactor this to outside and let spring handle it gracefully
//executorService.shutdown();
return outputs;
}
}
public class TransformTask<O,I> implements Callable<O> {
private ItemProcessor<? super I, ? extends O> itemProcessor;
private final MulticasterBatchListener<I, O> listener;
private final I item;
public TransformTask(ItemProcessor<? super I, ? extends O> itemProcessor, MulticasterBatchListener<I, O> listener, I item){
this.itemProcessor = itemProcessor;
this.listener = listener;
this.item = item;
}
@Override
public O call() throws Exception {
if (itemProcessor == null) {
@SuppressWarnings("unchecked")
O result = (O) item;
return result;
}
try {
listener.beforeProcess(item);
O result = itemProcessor.process(item);
listener.afterProcess(item, result);
return result;
}
catch (Exception e) {
listener.onProcessError(item, e);
throw e;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment