Skip to content

Instantly share code, notes, and snippets.

@Groostav
Created October 3, 2015 06:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Groostav/92054036601fab832a37 to your computer and use it in GitHub Desktop.
Save Groostav/92054036601fab832a37 to your computer and use it in GitHub Desktop.
akka {
loglevel = "INFO"
actor {
serializers {
xstream = "com.empowerops.common.AKKAXStreamXMLSerializer"
}
serialization-bindings {
"java.lang.Object" = xstream
"java.io.Serializable" = none
}
}
}
include "base"
akka {
log-dead-letters = on
actor {
provider = "akka.cluster.ClusterActorRefProvider"
deployment{
/JobDispatcherActor/router = {
router = adaptive-group
metrics-selector = mix
nr-of-instances = 100
routees.paths = ["/user/JobDispatcherActor"]
cluster {
enabled = on
use-role = backend
allow-local-routees = off
}
}
}
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
}
log-sent-messages = on
log-received-messages = on
log-remote-lifecycle-events = on
}
cluster {
seed-nodes = [
"akka.tcp://OASIS-ET-cluster@127.0.0.1:2553"
]
auto-down-unreachable-after = 10s
min-nr-of-members = 1
}
role{
ProxyActor.min-nr-of-members = 1
JobDispatcherActor.min-nr-of-members = 1
}
}
package com.empowerops.grid2
import com.empowerops.algorithms.SymbolTable
import com.empowerops.common.ModelEvent
import com.empowerops.front_end.model.ExternalToolProxyNode
sealed trait AKKAMessage
sealed trait AKKAExecuteResponseMessage extends AKKAMessage
case class ModelEventDTO(event: ModelEvent)
extends AKKAMessage
// class OptionDTO(option: String, arg: String)
// case class ExecuteRequestDTO(executable: Path, options: Set[OptionDTO], inputFile: Path, outputFile: Path, timeout: time.Duration, inputsByName: Map[String, Double])
case class ExecuteRequestDTO(node: ExternalToolProxyNode, symbolTable: SymbolTable)
extends AKKAMessage
case class ExecuteResponseDTO(symbolTable: SymbolTable)
extends AKKAExecuteResponseMessage
case class ExecuteErrorDTO(exception: Throwable)
extends AKKAExecuteResponseMessage
package com.empowerops.grid2;
import com.empowerops.algorithms.SymbolTable;
import com.empowerops.common.ModelEvent;
import com.empowerops.common.assists.CommonFixtureBase;
import com.empowerops.common.assists.Count;
import com.empowerops.common.assists.CountingEventBus;
import com.empowerops.common.documentation.ReflectionSensitive;
import com.empowerops.common.model.VariableSymbol;
import com.empowerops.front_end.execution.ExternalToolExecutorJob;
import com.empowerops.front_end.model.ExternalToolProxyNode;
import com.empowerops.linqalike.Queryable;
import com.google.inject.Module;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static com.empowerops.assists.Exceptions.assertThrows;
import static com.empowerops.common.BootstrappingUtilities.getParamName;
import static com.empowerops.common.BootstrappingUtilities.popProperty;
import static com.empowerops.common.BootstrappingUtilities.pushProperty;
import static com.empowerops.common.model.VariableSymbolBuilder.*;
import static com.empowerops.linqalike.common.Tuple.pair;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.*;
/**
* Please note this test system tests a number of classes, namely:
* <ul>
* <li>{@link com.empowerops.grid2.Host}</li>
* <li>{@link com.empowerops.grid2.ProxyActor}</li>
* <li>{@link com.empowerops.grid2.JobDispatcherActor}</li>
* </ul>
*
* Created by Geoff on 2015-09-11.
*/
public class ExternalToolActorSystemAcceptanceTests extends CommonFixtureBase{
private static final VariableSymbol Y1 = objective("Y1").build();
private static final Queryable<VariableSymbol> inputs = inputs("in1", "in2", "in3").build();
private static final Class<ProxyActor> PAClass = ProxyActor.class;
private static final Class<Host> HostClass = Host.class;
private ExternalToolProxyNode node;
private SymbolTable runningTable;
private static @ReflectionSensitive CountingEventBus eventBusForTest;
private static @ReflectionSensitive Module moduleForTest;
private ExternalToolExecutorJob.Factory factory;
@Before
public void setup_node_and_some_simple_vars() {
node = new ExternalToolProxyNode();
node.getProducedVariables().add(Y1);
node.getConsumedVariables().addAll(inputs);
runningTable = new SymbolTable();
runningTable.addAll(pair(inputs.first(), 1.0), pair(inputs.second(), 2.0), pair(inputs.last(), 3.0));
}
@Before
public void setup_akka_config() {
factory = mock(ExternalToolExecutorJob.Factory.class);
moduleForTest = binder -> binder.bind(ExternalToolExecutorJob.Factory.class).toInstance(factory);
eventBusForTest = new CountingEventBus();
pushProperty(getParamName(PAClass, "module"), "com.empowerops.grid2.ExternalToolActorSystemAcceptanceTests.moduleForTest");
pushProperty(getParamName(PAClass, "eventBus"), "com.empowerops.grid2.ExternalToolActorSystemAcceptanceTests.eventBusForTest");
// pushProperty(getParamName(PAClass, "ConfigComPath"), "/com/empowerops/grid2/base.conf");
// pushProperty(getParamName(HostClass, "ConfigComPath"), "/com/empowerops/grid2/base.conf");
}
@After
public void remove_akka_config_from_jvm() {
// popProperty(getParamName(HostClass, "ConfigComPath"));
// popProperty(getParamName(PAClass, "ConfigComPath"));
popProperty(getParamName(PAClass, "eventBus"));
popProperty(getParamName(PAClass, "module"));
}
@Test
public void when_dispatching_a_valid_job_should_create_and_run_ET_execution_job() {
//setup
ExternalToolExecutorJob job = mock(ExternalToolExecutorJob.class);
when(factory.create(eq(node), eq(runningTable))).thenReturn(job);
doAnswer(invoc -> runningTable.put(Y1, 42.0)).when(job).run();
Host host = new Host(eventBus, resourceEnvironmentFactory);
//act
SymbolTable result = host.execute(node, runningTable);
//assert
verify(factory, once()).create(eq(node), eq(runningTable));
verify(job, once()).run();
assertThat(result.getValueFor(node.getProducedVariables().single())).isEqualTo(42.0);
}
@Test
@SuppressWarnings("ThrowableResultOfMethodCallIgnored") // part of the test
public void when_dispatching_a_job_that_throws_exception_should_be_exposed_to_caller(){
//setup
ExternalToolExecutorJob job = mock(ExternalToolExecutorJob.class);
when(factory.create(eq(node), eq(runningTable))).thenReturn(job);
doThrow(new TestException()).when(job).run();
Host host = new Host(eventBus, resourceEnvironmentFactory);
//act & a little assert
assertThrows(TestException.class, () -> host.execute(node, runningTable));
//more assert
verify(factory, once()).create(eq(node), eq(runningTable));
verify(job, once()).run();
}
public static class TestException extends RuntimeException{
private static final long serialVersionUID = 4185502332377683589L;
}
@Test
public void when_dispatching_a_job_that_posts_an_event_we_should_recieve_that_event(){
//setup
ExternalToolExecutorJob job = mock(ExternalToolExecutorJob.class);
when(factory.create(eq(node), eq(runningTable))).thenReturn(job);
doAnswer(invoc -> { eventBusForTest.post(new NeatoEvent()); return null; }).when(job).run();
Host host = new Host(eventBus, resourceEnvironmentFactory);
//act
host.execute(node, runningTable);
//assert
verify(factory, once()).create(node, runningTable);
verify(job, once()).run();
eventBus.shouldHaveBeenAskedToPost(Count.Once, NeatoEvent.class);
}
public static class NeatoEvent implements ModelEvent {}
@Test
public void when_posting_an_event_locally_that_event_makes_it_to_all_actors(){
//setup
when(factory.create(eq(node), eq(runningTable))).thenReturn(mock(ExternalToolExecutorJob.class));
Host host = new Host(eventBus, resourceEnvironmentFactory);
//act
host.execute(node, runningTable);
eventBus.post(new NeatoEvent());
//more assert
eventBusForTest.shouldHaveBeenAskedToPost(Count.Once, NeatoEvent.class);
}
}
include "cluster"
akka{
remote.netty.tcp.port=0
cluster.roles = [
JobDispatcherActor
]
}
package com.empowerops.grid2
import java.util.concurrent.ArrayBlockingQueue
import akka.actor.{ActorRef, ActorSystem, Props}
import com.empowerops.algorithms.SymbolTable
import com.empowerops.common.persistence.ResourceEnvironment
import com.empowerops.common.{BootstrappingUtilities, EventBus}
import com.empowerops.front_end.model.ExternalToolProxyNode
import com.google.inject.Inject
import com.typesafe.config.ConfigFactory
/**
* Created by Geoff on 2015-09-12.
*/
class Host @Inject() (eventBus: EventBus, envFactory: ResourceEnvironment.Factory){
//create a tool to make the call synchronous.
val completionSignal = new ArrayBlockingQueue[AKKAExecuteResponseMessage](1)
// Create an Akka system
val env = envFactory.forServiceWithoutView(classOf[Host])
val config = ConfigFactory.parseFile(env.loadResourceAt(Host.ConfigComPath).asFile)
val system = ActorSystem("OASIS-ET-cluster", config)
//add that dispatcher to the system
var dispatcherActor : ActorRef = null
dispatcherActor = system.actorOf(
props = Props(new JobDispatcherActor(eventBus, completionSignal)),
name = "JobDispatcherActor"
)
def execute(node: ExternalToolProxyNode, symbolTable: SymbolTable): SymbolTable ={
// start the calculation
dispatcherActor ! new ExecuteRequestDTO(node, symbolTable)
//wait for the job to complete
// val result = completionSignal.poll(5, TimeUnit.SECONDS)
val result = completionSignal.take()
//return the new symbol table
result match{
case result: ExecuteResponseDTO => result.symbolTable
case result: ExecuteErrorDTO => throw result.exception
case null => throw new IllegalStateException
}
}
}
object Host{
val ConfigComPath = BootstrappingUtilities.getEnvString(classOf[Host], "ConfigComPath").orElse("/com/empowerops/grid2/Host.conf");
}
package com.empowerops.grid2
import java.util
import akka.actor.{ReceiveTimeout, Actor, ActorLogging}
import akka.cluster.Cluster
import akka.routing.FromConfig
import com.empowerops.common.{EventBus, ModelEvent}
import com.google.common.eventbus.Subscribe
import scala.collection.immutable.HashSet
/**
* Created by Geoff on 2015-09-12.
*/
class JobDispatcherActor(eventBus: EventBus,
signal: util.Queue[AKKAExecuteResponseMessage])
extends Actor with ActorLogging {
val backend = context.actorOf(
FromConfig.props(),
name = "router"
)
var syndicatedEvents = new HashSet[ModelEvent]
eventBus.register(this)
Cluster(context.system) registerOnMemberUp {
backend ! new ModelEventDTO(null)
}
@Subscribe def pumpEventToActorsOn(event: ModelEvent): Unit ={
if ( ! syndicatedEvents.contains(event)) {
backend ! new ModelEventDTO(event)
syndicatedEvents += event
}
else{
syndicatedEvents -= event
}
}
def pumpEventToLocalMachine(event: ModelEvent): Unit = {
if ( ! syndicatedEvents.contains (event)) {
eventBus.post (event)
syndicatedEvents += event
}
else {
syndicatedEvents -= event
}
}
override def receive = {
case ReceiveTimeout =>
log.info("Timeout");
case message : ExecuteRequestDTO =>
log.info("received execute request")
backend ! message
case ModelEventDTO(event) =>
log.info("recieved model event")
pumpEventToLocalMachine(event)
case message : AKKAExecuteResponseMessage =>
log.info("recieved execution response")
signal.offer(message)
}
}
include "cluster"
akka {
remote.netty.tcp.port = 2553
cluster.roles = [
ProxyActor
]
}
package com.empowerops.grid2
import akka.actor._
import com.empowerops.common.BootstrappingUtilities.{getEnv, getEnvString}
import com.empowerops.common.persistence.ResourceEnvironment.Factory
import com.empowerops.common.{AbstractModule, EventBus, ModelEvent}
import com.empowerops.front_end.execution.{ExternalToolExecutor, ExternalToolExecutorJob, ProcessInfoCollector, ProcessMonitor}
import com.empowerops.grid2.ProxyActor.DefaultEventBus
import com.google.common.eventbus.Subscribe
import com.google.inject.{Guice, Module}
import com.typesafe.config.ConfigFactory
import net.codingwell.scalaguice.InjectorExtensions._
import net.codingwell.scalaguice.ScalaModule
class ProxyActor extends Actor with ActorLogging {
val eventBus = getEnv(classOf[ProxyActor], "eventBus", classOf[EventBus]) .orElse(DefaultEventBus)
val module = getEnv(classOf[ProxyActor], "module", classOf[Module]) .orElse(RunningModule)
this.eventBus.register(this)
object RunningModule extends AbstractModule with ScalaModule {
override def configure() {
bind[EventBus].toInstance(eventBus)
bindFactory(classOf[ExternalToolExecutor.Factory])
bindFactory(classOf[ExternalToolExecutorJob.Factory])
bindFactory(classOf[ProcessMonitor.Factory])
bindFactory(classOf[ProcessInfoCollector.Factory])
}
}
@Subscribe def onLocalEvent(event: ModelEvent): Unit ={
sender ! new ModelEventDTO(event)
}
// TODO: this should be pumping events identically to the ETJobDispatcher actor
// I need a test to fail this configuration, but my heads spinning figuring out how to manage that...
override def receive: Receive = {
case ExecuteRequestDTO(node, table) =>
try {
val injector = Guice.createInjector(module)
val jobFactory = injector.instance[ExternalToolExecutorJob.Factory]
val job = jobFactory.create(node, table)
job.run()
sender ! new ExecuteResponseDTO(table)
}
catch{
case exception: Throwable =>
log.info("saw exception attempting to evaluate " + table)
log.info("aforementioned exception: {}", exception)
sender ! new ExecuteErrorDTO(exception)
}
case ModelEventDTO(event) =>
log.info("received model event: " + event)
eventBus.post(event)
case _ =>
throw new UnsupportedOperationException()
}
}
object ProxyActor{
def props(): Props = Props(classOf[ProxyActor])
val DefaultEventBus = new EventBus("ET-Task")
val SystemName: String = "OASIS-ET-cluster"
val ConfigComPath = getEnvString(classOf[ProxyActor], "ConfigComPath").orElse("/com/empowerops/grid2/ProxyActor.conf")
def main(args: Array[String]) : Unit = {
//get the configuration file from classpath
val env = new Factory().forServiceWithoutView(classOf[ProxyActor])
val config = ConfigFactory.parseFile(env.loadResourceAt(ConfigComPath).asFile())
//create an actor system with that config
val system = ActorSystem(SystemName , config)
//create a remote actor from actorSystem
val remote = system.actorOf(Props[ProxyActor], name = "ProxyActor")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment