-
-
Save elaPa/8bfa648806b47d236b99 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.aistemos.newchoreographer.gpars; | |
import com.google.common.collect.Lists; | |
import groovyx.gpars.DataflowMessagingRunnable; | |
import groovyx.gpars.dataflow.Dataflow; | |
import groovyx.gpars.dataflow.DataflowQueue; | |
import groovyx.gpars.dataflow.DataflowWriteChannel; | |
import groovyx.gpars.dataflow.operator.DataflowProcessor; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
/** | |
* @author ela.pawelczyk@aistemos.com | |
*/ | |
public class GParsTest { | |
private static final Logger LOGGER = LoggerFactory.getLogger(GParsTest.class); | |
public static final int SUB_PROCESS_COUNT = 8; | |
public static void main(String[] args) throws InterruptedException { | |
ExecutorService executorService = Executors.newFixedThreadPool(8); | |
for (int i = 0; i < 100000; i++) { | |
final int id = i; | |
executorService.execute(new Runnable() { | |
public void run() { | |
generateReport(id); | |
} | |
}); | |
} | |
executorService.shutdown(); | |
} | |
private static void generateReport(final int id) { | |
LOGGER.info("Report number: {}", id); | |
DataflowQueue done = new DataflowQueue<>(); | |
List<DataflowQueue> subReportsDone = new ArrayList<>(); | |
List<DataflowProcessor> processors = new ArrayList<>(); | |
for (int i = 0; i < SUB_PROCESS_COUNT; i++) { | |
final int subId = i; | |
DataflowQueue<String> subReportStartQueue = new DataflowQueue<>(); | |
DataflowQueue<String> subReportDoneQueue = new DataflowQueue<>(); | |
processors.add(Dataflow.operator(subReportStartQueue, subReportDoneQueue, new DataflowMessagingRunnable(1){ | |
@Override | |
protected void doRun(final Object... arguments) { | |
LOGGER.info("Generating sub report for {}", arguments[0]); | |
sleep(); | |
Object subReport = generateSubReport(String.format("report-%d-%d", id, subId)); | |
getOwningProcessor().bindAllOutputs(subReport); | |
} | |
})); | |
subReportsDone.add(subReportDoneQueue); | |
subReportStartQueue.bind(String.valueOf(i)); | |
} | |
processors.add(Dataflow.operator(subReportsDone, Lists.newArrayList(done), new Task2(SUB_PROCESS_COUNT, "Report done"))); | |
Object result = waitForValue(done); | |
terminateProcessors(processors); | |
LOGGER.info("Report number {} FINISHED: {}", id, result); | |
} | |
private static Object generateSubReport(String reportId) { | |
final DataflowQueue<String> init = new DataflowQueue<>(); | |
final DataflowQueue task1 = new DataflowQueue(); | |
final DataflowQueue task1Done = new DataflowQueue(); | |
final DataflowQueue task2 = new DataflowQueue(); | |
final DataflowQueue task2Done = new DataflowQueue(); | |
final DataflowQueue task3 = new DataflowQueue(); | |
final DataflowQueue task3Done = new DataflowQueue(); | |
final DataflowQueue task4 = new DataflowQueue(); | |
final DataflowQueue task4Done = new DataflowQueue(); | |
final DataflowQueue task5 = new DataflowQueue(); | |
final DataflowQueue task5Done = new DataflowQueue(); | |
final DataflowQueue done = new DataflowQueue<>(); | |
Dataflow.splitter(init, Lists.<DataflowWriteChannel>newArrayList(task1, task2, task3, task4, task5)); | |
List<DataflowProcessor> processors = new ArrayList<>(); | |
processors.add(Dataflow.operator(task1, task1Done, new Task(1, "task1"))); | |
processors.add(Dataflow.operator(task2, task2Done, new Task(1, "task2"))); | |
processors.add(Dataflow.operator(task3, task3Done, new Task(1, "task3"))); | |
processors.add(Dataflow.operator(task4, task4Done, new Task(1, "task4"))); | |
processors.add(Dataflow.operator(task5, task5Done, new Task(1, "task5"))); | |
processors.add(Dataflow.operator(Lists.newArrayList(task1Done, task2Done, task3Done, task4Done, task5Done), | |
Lists.newArrayList(done), new Task2(5, "DONE"))); | |
init.bind(reportId); | |
Object result = waitForValue(done); | |
LOGGER.info("REPORT {} FINISHED: {}", reportId, result); | |
terminateProcessors(processors); | |
return result; | |
} | |
private static void terminateProcessors(final List<DataflowProcessor> processors) { | |
for (DataflowProcessor processor : processors) { | |
processor.terminate(); | |
} | |
LOGGER.info("Terminated"); | |
} | |
private static void sleep() { | |
try { | |
Thread.sleep(2); | |
} catch (InterruptedException e) { | |
//ignore | |
} | |
} | |
public static class Task extends DataflowMessagingRunnable { | |
private final String name; | |
protected Task(final int numberOfParameters, final String name) { | |
super(numberOfParameters); | |
this.name = name; | |
} | |
@Override | |
protected void doRun(final Object... arguments) { | |
LOGGER.info("Starting {}: {}", arguments[0], name); | |
sleep(); | |
LOGGER.info("Done: {}", name); | |
getOwningProcessor().bindAllOutputs(String.format("%s - %s", arguments[0], name)); | |
} | |
} | |
public static class Task2 extends DataflowMessagingRunnable { | |
private final String name; | |
protected Task2(final int numberOfParameters, final String name) { | |
super(numberOfParameters); | |
this.name = name; | |
} | |
@Override | |
protected void doRun(final Object... arguments) { | |
LOGGER.info("Starting {}: {}", arguments[0], name); | |
sleep(); | |
getOwningProcessor().bindAllOutputs(Lists.newArrayList(arguments)); | |
LOGGER.info("Done: {}", name); | |
} | |
} | |
/** | |
* Waits for value to appear in stream. | |
* | |
* @param stream The stream to try to read from | |
*/ | |
public static <T> T waitForValue(final DataflowQueue<T> stream) { | |
try { | |
LOGGER.info("Waiting for results"); | |
T val = stream.getVal(); | |
LOGGER.info("Got result"); | |
return val; | |
} catch (InterruptedException e) { | |
throw new RuntimeException("Interrupted when retrieving the result", e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
requires: java 1.7
other dependencies:
'org.codehaus.gpars:gpars:1.2.1'
'org.codehaus.groovy:groovy-all:2.4.1'
"org.slf4j:slf4j-api:1.7.5"
"ch.qos.logback:logback-classic:1.1.1"
"org.slf4j:jcl-over-slf4j:1.7.5"