Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Example of parallel processing on ConcurrentChunkProcessor
* same as SimpleChunkProcessor except otherwise shown
public class ConcurrentChunkProcessor<I, O> implements ChunkProcessor<I>, InitializingBean {
// ...
// example of parallel processing using Executor service, better create this by Spring and inject if available or
// use the same TaskExecutor from the Context
private static final int THREADPOOL_TIMEOUT_IN_SECONDS = 1;
private static final int THREADPOOL_PROCESSOR_NUMBER = Runtime.getRuntime().availableProcessors() * 2;
private final ExecutorService executorService =
new ThreadPoolExecutor(
THREADPOOL_PROCESSOR_NUMBER, // core thread pool size
THREADPOOL_PROCESSOR_NUMBER, // maximum thread pool size
THREADPOOL_TIMEOUT_IN_SECONDS, // time to wait before resizing pool
new ArrayBlockingQueue<Runnable>(THREADPOOL_PROCESSOR_NUMBER, true),
new ThreadPoolExecutor.CallerRunsPolicy());
// The new transform method, on my case I don't need the output generated by the process / doProcess
// hence I'm doing iterator.remove without knowledge of the result of doProcess()
protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
Chunk<O> outputs = new Chunk<O>();
List<Future<O>> list = new ArrayList<Future<O>>();
int itemFromChunk = 0;
int chunkSize = inputs.getItems().size();"Processing chunk[" + chunkSize + "]");
//add for future
for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
final I item =;
Callable<O> transformTask = new TransformTask<O,I>(itemProcessor,listener,item);
Future<O> future = executorService.submit(transformTask);
//removing it early since the future is only responsible to add the output back if it exists
//retrieve executed (blocking)
itemFromChunk = 0;
for(Future<O> future : list){
O output = null;
try {
output = future.get();
} catch (InterruptedException e) {
} catch (ExecutionException e) {
} catch (Exception e) {
throw e;
if (output != null) {
//dont shutdown just yet, remember to refactor this to outside and let spring handle it gracefully
return outputs;
public class TransformTask<O,I> implements Callable<O> {
private ItemProcessor<? super I, ? extends O> itemProcessor;
private final MulticasterBatchListener<I, O> listener;
private final I item;
public TransformTask(ItemProcessor<? super I, ? extends O> itemProcessor, MulticasterBatchListener<I, O> listener, I item){
this.itemProcessor = itemProcessor;
this.listener = listener;
this.item = item;
public O call() throws Exception {
if (itemProcessor == null) {
O result = (O) item;
return result;
try {
O result = itemProcessor.process(item);
listener.afterProcess(item, result);
return result;
catch (Exception e) {
listener.onProcessError(item, e);
throw e;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.