Skip to content

Instantly share code, notes, and snippets.

@debop
Created January 10, 2013 14:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save debop/4502435 to your computer and use it in GitHub Desktop.
Save debop/4502435 to your computer and use it in GitHub Desktop.
병렬로 프로그래밍 실행하기
public static <T, V> List<V> run(final Iterable<T> elements, final Function1<T, V> function) {
shouldNotBeNull(elements, "elements");
shouldNotBeNull(function, "function");
ExecutorService executor = Executors.newFixedThreadPool(PROCESS_COUNT);
final List<V> results = new ArrayList<V>();
if (log.isDebugEnabled())
log.debug("작업을 병렬로 수행합니다. 작업 스레드 수=[{}]", PROCESS_COUNT);
try {
List<T> elemList = Lists.newArrayList(elements);
List<List<T>> partitions = Lists.partition(elemList, PROCESS_COUNT);
final Map<Integer, List<V>> localResults = new LinkedHashMap<Integer, List<V>>();
List<Callable<List<V>>> tasks = new LinkedList<Callable<List<V>>>();
for (int p = 0; p < partitions.size(); p++) {
final List<T> partition = partitions.get(p);
final List<V> localResult = new ArrayList<V>();
localResults.put(p, localResult);
Callable<List<V>> task = new Callable<List<V>>() {
@Override
public List<V> call() throws Exception {
for (final T element : partition)
localResult.add(function.execute(element));
return localResult;
}
};
tasks.add(task);
}
executor.invokeAll(tasks);
for (int i = 0; i < partitions.size(); i++) {
results.addAll(localResults.get(i));
}
if (log.isDebugEnabled())
log.debug("모든 작업을 병렬로 완료했습니다. partitions=[{}]", partitions.size());
} catch (Exception e) {
log.error("데이터에 대한 병렬 작업 중 예외가 발생했습니다.", e);
} finally {
executor.shutdown();
}
return results;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment