Last active
December 18, 2015 08:09
-
-
Save sawano/5751721 to your computer and use it in GitHub Desktop.
Java example of fold/reduce in Akka.
Point of interest is totalResult(List<Future<Object>> answers).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package se.sawano.akka.example.java.reduce; | |
import java.io.Serializable; | |
public class Begin implements Serializable { | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package se.sawano.akka.example.java.reduce; | |
import akka.actor.*; | |
import akka.japi.Function2; | |
import akka.testkit.TestProbe; | |
import scala.concurrent.Await; | |
import scala.concurrent.Future; | |
import scala.concurrent.duration.Duration; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
import static akka.dispatch.Futures.fold; | |
import static akka.pattern.Patterns.ask; | |
public class ParentActor extends UntypedActor { | |
private final int numberOfWorkers; | |
public ParentActor(int numberOfWorkers) { | |
this.numberOfWorkers = numberOfWorkers; | |
} | |
@Override | |
public void onReceive(Object message) throws Exception { | |
if (Begin.class.isInstance(message)) { | |
final List<Future<Object>> answers = askWorkers(); | |
getSender().tell(totalResult(answers), getSelf()); | |
} | |
else { | |
unhandled(message); | |
} | |
} | |
private List<Future<Object>> askWorkers() { | |
final ArrayList<Future<Object>> answers = new ArrayList<>(); | |
for (int i = 0; i < numberOfWorkers; ++i) { | |
answers.add(ask(createWorkerWithName("worker-" + i), new Begin(), 1000)); | |
} | |
return answers; | |
} | |
private Result totalResult(List<Future<Object>> answers) throws Exception { | |
final Future<Result> totalResult = fold(new Result(0), answers, new Function2<Result, Object, Result>() { | |
@Override | |
public Result apply(Result previousResult, Object workerResponse) throws Exception { | |
return new Result(previousResult.result + (Integer) workerResponse); | |
} | |
}, getContext().dispatcher()); | |
return Await.result(totalResult, Duration.create(2, TimeUnit.SECONDS)); | |
} | |
private ActorRef createWorkerWithName(String name) { | |
return getContext().actorOf(new Props(WorkerActor.class), name); | |
} | |
public static void main(String... args) { | |
final int NUMBER_OF_WORKERS = 10; | |
final ActorSystem system = ActorSystem.create(); | |
final TestProbe probe = TestProbe.apply(system); | |
final ActorRef parentActor = system.actorOf(new Props(new UntypedActorFactory() { | |
@Override | |
public Actor create() throws Exception { | |
return new ParentActor(NUMBER_OF_WORKERS); | |
} | |
}), "parentActor"); | |
parentActor.tell(new Begin(), probe.ref()); | |
final Result result = probe.expectMsgClass(Duration.create(5, TimeUnit.SECONDS), Result.class); | |
System.out.println("Expected: " + NUMBER_OF_WORKERS * 2 + " and got: " + result.result); | |
system.shutdown(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package se.sawano.akka.example.java.reduce; | |
import java.io.Serializable; | |
public class Result implements Serializable { | |
public final Integer result; | |
public Result(Integer result) { | |
this.result = result; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package se.sawano.akka.example.java.reduce; | |
import akka.actor.UntypedActor; | |
public class WorkerActor extends UntypedActor { | |
@Override | |
public void onReceive(Object message) throws Exception { | |
if (Begin.class.isInstance(message)) { | |
getSender().tell(two(), getSelf()); | |
} | |
else { | |
unhandled(message); | |
} | |
} | |
private Integer two() { | |
return Integer.valueOf(2); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment