Last active
August 29, 2015 14:04
-
-
Save dgomesbr/5b9ab4087555c5502876 to your computer and use it in GitHub Desktop.
Example of parallel processing on ConcurrentChunkProcessor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* same as SimpleChunkProcessor except otherwise shown | |
*/ | |
public class ConcurrentChunkProcessor<I, O> implements ChunkProcessor<I>, InitializingBean { | |
// ... | |
// example of parallel processing using Executor service, better create this by Spring and inject if available or | |
// use the same TaskExecutor from the Context | |
private static final int THREADPOOL_TIMEOUT_IN_SECONDS = 1; | |
private static final int THREADPOOL_PROCESSOR_NUMBER = Runtime.getRuntime().availableProcessors() * 2; | |
private final ExecutorService executorService = | |
new ThreadPoolExecutor( | |
THREADPOOL_PROCESSOR_NUMBER, // core thread pool size | |
THREADPOOL_PROCESSOR_NUMBER, // maximum thread pool size | |
THREADPOOL_TIMEOUT_IN_SECONDS, // time to wait before resizing pool | |
TimeUnit.SECONDS, | |
new ArrayBlockingQueue<Runnable>(THREADPOOL_PROCESSOR_NUMBER, true), | |
new ThreadPoolExecutor.CallerRunsPolicy()); | |
//... | |
// The new transform method, on my case I don't need the output generated by the process / doProcess | |
// hence I'm doing iterator.remove without knowledge of the result of doProcess() | |
// | |
protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception { | |
Chunk<O> outputs = new Chunk<O>(); | |
List<Future<O>> list = new ArrayList<Future<O>>(); | |
int itemFromChunk = 0; | |
int chunkSize = inputs.getItems().size(); | |
logger.info("Processing chunk[" + chunkSize + "]"); | |
//add for future | |
for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) { | |
itemFromChunk++; | |
final I item = iterator.next(); | |
Callable<O> transformTask = new TransformTask<O,I>(itemProcessor,listener,item); | |
Future<O> future = executorService.submit(transformTask); | |
list.add(future); | |
//removing it early since the future is only responsible to add the output back if it exists | |
iterator.remove(); | |
} | |
/* | |
//retrieve executed (blocking) | |
itemFromChunk = 0; | |
for(Future<O> future : list){ | |
itemFromChunk++; | |
O output = null; | |
try { | |
output = future.get(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} catch (ExecutionException e) { | |
e.printStackTrace(); | |
} catch (Exception e) { | |
inputs.clear(); | |
throw e; | |
} | |
if (output != null) { | |
outputs.add(output); | |
} | |
} | |
*/ | |
//dont shutdown just yet, remember to refactor this to outside and let spring handle it gracefully | |
//executorService.shutdown(); | |
return outputs; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TransformTask<O,I> implements Callable<O> { | |
private ItemProcessor<? super I, ? extends O> itemProcessor; | |
private final MulticasterBatchListener<I, O> listener; | |
private final I item; | |
public TransformTask(ItemProcessor<? super I, ? extends O> itemProcessor, MulticasterBatchListener<I, O> listener, I item){ | |
this.itemProcessor = itemProcessor; | |
this.listener = listener; | |
this.item = item; | |
} | |
@Override | |
public O call() throws Exception { | |
if (itemProcessor == null) { | |
@SuppressWarnings("unchecked") | |
O result = (O) item; | |
return result; | |
} | |
try { | |
listener.beforeProcess(item); | |
O result = itemProcessor.process(item); | |
listener.afterProcess(item, result); | |
return result; | |
} | |
catch (Exception e) { | |
listener.onProcessError(item, e); | |
throw e; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment