Skip to content

Instantly share code, notes, and snippets.

@elaPa
Created March 11, 2015 13:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save elaPa/8bfa648806b47d236b99 to your computer and use it in GitHub Desktop.
Save elaPa/8bfa648806b47d236b99 to your computer and use it in GitHub Desktop.
package com.aistemos.newchoreographer.gpars;
import com.google.common.collect.Lists;
import groovyx.gpars.DataflowMessagingRunnable;
import groovyx.gpars.dataflow.Dataflow;
import groovyx.gpars.dataflow.DataflowQueue;
import groovyx.gpars.dataflow.DataflowWriteChannel;
import groovyx.gpars.dataflow.operator.DataflowProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author ela.pawelczyk@aistemos.com
*/
public class GParsTest {
private static final Logger LOGGER = LoggerFactory.getLogger(GParsTest.class);
public static final int SUB_PROCESS_COUNT = 8;
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(8);
for (int i = 0; i < 100000; i++) {
final int id = i;
executorService.execute(new Runnable() {
public void run() {
generateReport(id);
}
});
}
executorService.shutdown();
}
private static void generateReport(final int id) {
LOGGER.info("Report number: {}", id);
DataflowQueue done = new DataflowQueue<>();
List<DataflowQueue> subReportsDone = new ArrayList<>();
List<DataflowProcessor> processors = new ArrayList<>();
for (int i = 0; i < SUB_PROCESS_COUNT; i++) {
final int subId = i;
DataflowQueue<String> subReportStartQueue = new DataflowQueue<>();
DataflowQueue<String> subReportDoneQueue = new DataflowQueue<>();
processors.add(Dataflow.operator(subReportStartQueue, subReportDoneQueue, new DataflowMessagingRunnable(1){
@Override
protected void doRun(final Object... arguments) {
LOGGER.info("Generating sub report for {}", arguments[0]);
sleep();
Object subReport = generateSubReport(String.format("report-%d-%d", id, subId));
getOwningProcessor().bindAllOutputs(subReport);
}
}));
subReportsDone.add(subReportDoneQueue);
subReportStartQueue.bind(String.valueOf(i));
}
processors.add(Dataflow.operator(subReportsDone, Lists.newArrayList(done), new Task2(SUB_PROCESS_COUNT, "Report done")));
Object result = waitForValue(done);
terminateProcessors(processors);
LOGGER.info("Report number {} FINISHED: {}", id, result);
}
private static Object generateSubReport(String reportId) {
final DataflowQueue<String> init = new DataflowQueue<>();
final DataflowQueue task1 = new DataflowQueue();
final DataflowQueue task1Done = new DataflowQueue();
final DataflowQueue task2 = new DataflowQueue();
final DataflowQueue task2Done = new DataflowQueue();
final DataflowQueue task3 = new DataflowQueue();
final DataflowQueue task3Done = new DataflowQueue();
final DataflowQueue task4 = new DataflowQueue();
final DataflowQueue task4Done = new DataflowQueue();
final DataflowQueue task5 = new DataflowQueue();
final DataflowQueue task5Done = new DataflowQueue();
final DataflowQueue done = new DataflowQueue<>();
Dataflow.splitter(init, Lists.<DataflowWriteChannel>newArrayList(task1, task2, task3, task4, task5));
List<DataflowProcessor> processors = new ArrayList<>();
processors.add(Dataflow.operator(task1, task1Done, new Task(1, "task1")));
processors.add(Dataflow.operator(task2, task2Done, new Task(1, "task2")));
processors.add(Dataflow.operator(task3, task3Done, new Task(1, "task3")));
processors.add(Dataflow.operator(task4, task4Done, new Task(1, "task4")));
processors.add(Dataflow.operator(task5, task5Done, new Task(1, "task5")));
processors.add(Dataflow.operator(Lists.newArrayList(task1Done, task2Done, task3Done, task4Done, task5Done),
Lists.newArrayList(done), new Task2(5, "DONE")));
init.bind(reportId);
Object result = waitForValue(done);
LOGGER.info("REPORT {} FINISHED: {}", reportId, result);
terminateProcessors(processors);
return result;
}
private static void terminateProcessors(final List<DataflowProcessor> processors) {
for (DataflowProcessor processor : processors) {
processor.terminate();
}
LOGGER.info("Terminated");
}
private static void sleep() {
try {
Thread.sleep(2);
} catch (InterruptedException e) {
//ignore
}
}
public static class Task extends DataflowMessagingRunnable {
private final String name;
protected Task(final int numberOfParameters, final String name) {
super(numberOfParameters);
this.name = name;
}
@Override
protected void doRun(final Object... arguments) {
LOGGER.info("Starting {}: {}", arguments[0], name);
sleep();
LOGGER.info("Done: {}", name);
getOwningProcessor().bindAllOutputs(String.format("%s - %s", arguments[0], name));
}
}
public static class Task2 extends DataflowMessagingRunnable {
private final String name;
protected Task2(final int numberOfParameters, final String name) {
super(numberOfParameters);
this.name = name;
}
@Override
protected void doRun(final Object... arguments) {
LOGGER.info("Starting {}: {}", arguments[0], name);
sleep();
getOwningProcessor().bindAllOutputs(Lists.newArrayList(arguments));
LOGGER.info("Done: {}", name);
}
}
/**
* Waits for value to appear in stream.
*
* @param stream The stream to try to read from
*/
public static <T> T waitForValue(final DataflowQueue<T> stream) {
try {
LOGGER.info("Waiting for results");
T val = stream.getVal();
LOGGER.info("Got result");
return val;
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted when retrieving the result", e);
}
}
}
@elaPa
Copy link
Author

elaPa commented Mar 11, 2015

requires: java 1.7

other dependencies:
'org.codehaus.gpars:gpars:1.2.1'
'org.codehaus.groovy:groovy-all:2.4.1'

"org.slf4j:slf4j-api:1.7.5"
"ch.qos.logback:logback-classic:1.1.1"
"org.slf4j:jcl-over-slf4j:1.7.5"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment