Skip to content

Instantly share code, notes, and snippets.

@sawano
Last active December 18, 2015 08:09
Show Gist options
  • Save sawano/5751721 to your computer and use it in GitHub Desktop.
Save sawano/5751721 to your computer and use it in GitHub Desktop.
Java example of fold/reduce in Akka. Point of interest is totalResult(List<Future<Object>> answers).
package se.sawano.akka.example.java.reduce;
import java.io.Serializable;
public class Begin implements Serializable {
}
package se.sawano.akka.example.java.reduce;
import akka.actor.*;
import akka.japi.Function2;
import akka.testkit.TestProbe;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static akka.dispatch.Futures.fold;
import static akka.pattern.Patterns.ask;
public class ParentActor extends UntypedActor {
private final int numberOfWorkers;
public ParentActor(int numberOfWorkers) {
this.numberOfWorkers = numberOfWorkers;
}
@Override
public void onReceive(Object message) throws Exception {
if (Begin.class.isInstance(message)) {
final List<Future<Object>> answers = askWorkers();
getSender().tell(totalResult(answers), getSelf());
}
else {
unhandled(message);
}
}
private List<Future<Object>> askWorkers() {
final ArrayList<Future<Object>> answers = new ArrayList<>();
for (int i = 0; i < numberOfWorkers; ++i) {
answers.add(ask(createWorkerWithName("worker-" + i), new Begin(), 1000));
}
return answers;
}
private Result totalResult(List<Future<Object>> answers) throws Exception {
final Future<Result> totalResult = fold(new Result(0), answers, new Function2<Result, Object, Result>() {
@Override
public Result apply(Result previousResult, Object workerResponse) throws Exception {
return new Result(previousResult.result + (Integer) workerResponse);
}
}, getContext().dispatcher());
return Await.result(totalResult, Duration.create(2, TimeUnit.SECONDS));
}
private ActorRef createWorkerWithName(String name) {
return getContext().actorOf(new Props(WorkerActor.class), name);
}
public static void main(String... args) {
final int NUMBER_OF_WORKERS = 10;
final ActorSystem system = ActorSystem.create();
final TestProbe probe = TestProbe.apply(system);
final ActorRef parentActor = system.actorOf(new Props(new UntypedActorFactory() {
@Override
public Actor create() throws Exception {
return new ParentActor(NUMBER_OF_WORKERS);
}
}), "parentActor");
parentActor.tell(new Begin(), probe.ref());
final Result result = probe.expectMsgClass(Duration.create(5, TimeUnit.SECONDS), Result.class);
System.out.println("Expected: " + NUMBER_OF_WORKERS * 2 + " and got: " + result.result);
system.shutdown();
}
}
package se.sawano.akka.example.java.reduce;
import java.io.Serializable;
public class Result implements Serializable {
public final Integer result;
public Result(Integer result) {
this.result = result;
}
}
package se.sawano.akka.example.java.reduce;
import akka.actor.UntypedActor;
public class WorkerActor extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
if (Begin.class.isInstance(message)) {
getSender().tell(two(), getSelf());
}
else {
unhandled(message);
}
}
private Integer two() {
return Integer.valueOf(2);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment