|
package se.sawano.akka.example.java.reduce; |
|
|
|
import akka.actor.*; |
|
import akka.japi.Function2; |
|
import akka.testkit.TestProbe; |
|
import scala.concurrent.Await; |
|
import scala.concurrent.Future; |
|
import scala.concurrent.duration.Duration; |
|
|
|
import java.util.ArrayList; |
|
import java.util.List; |
|
import java.util.concurrent.TimeUnit; |
|
|
|
import static akka.dispatch.Futures.fold; |
|
import static akka.pattern.Patterns.ask; |
|
|
|
public class ParentActor extends UntypedActor { |
|
|
|
private final int numberOfWorkers; |
|
|
|
public ParentActor(int numberOfWorkers) { |
|
this.numberOfWorkers = numberOfWorkers; |
|
} |
|
|
|
@Override |
|
public void onReceive(Object message) throws Exception { |
|
if (Begin.class.isInstance(message)) { |
|
final List<Future<Object>> answers = askWorkers(); |
|
getSender().tell(totalResult(answers), getSelf()); |
|
} |
|
else { |
|
unhandled(message); |
|
} |
|
} |
|
|
|
private List<Future<Object>> askWorkers() { |
|
final ArrayList<Future<Object>> answers = new ArrayList<>(); |
|
for (int i = 0; i < numberOfWorkers; ++i) { |
|
answers.add(ask(createWorkerWithName("worker-" + i), new Begin(), 1000)); |
|
} |
|
return answers; |
|
} |
|
|
|
private Result totalResult(List<Future<Object>> answers) throws Exception { |
|
final Future<Result> totalResult = fold(new Result(0), answers, new Function2<Result, Object, Result>() { |
|
@Override |
|
public Result apply(Result previousResult, Object workerResponse) throws Exception { |
|
return new Result(previousResult.result + (Integer) workerResponse); |
|
} |
|
}, getContext().dispatcher()); |
|
|
|
return Await.result(totalResult, Duration.create(2, TimeUnit.SECONDS)); |
|
} |
|
|
|
private ActorRef createWorkerWithName(String name) { |
|
return getContext().actorOf(new Props(WorkerActor.class), name); |
|
} |
|
|
|
public static void main(String... args) { |
|
final int NUMBER_OF_WORKERS = 10; |
|
final ActorSystem system = ActorSystem.create(); |
|
final TestProbe probe = TestProbe.apply(system); |
|
|
|
final ActorRef parentActor = system.actorOf(new Props(new UntypedActorFactory() { |
|
@Override |
|
public Actor create() throws Exception { |
|
return new ParentActor(NUMBER_OF_WORKERS); |
|
} |
|
}), "parentActor"); |
|
|
|
parentActor.tell(new Begin(), probe.ref()); |
|
|
|
final Result result = probe.expectMsgClass(Duration.create(5, TimeUnit.SECONDS), Result.class); |
|
|
|
System.out.println("Expected: " + NUMBER_OF_WORKERS * 2 + " and got: " + result.result); |
|
|
|
system.shutdown(); |
|
} |
|
} |