Created
October 3, 2015 06:12
-
-
Save Groostav/92054036601fab832a37 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
akka { | |
loglevel = "INFO" | |
actor { | |
serializers { | |
xstream = "com.empowerops.common.AKKAXStreamXMLSerializer" | |
} | |
serialization-bindings { | |
"java.lang.Object" = xstream | |
"java.io.Serializable" = none | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
include "base" | |
akka { | |
log-dead-letters = on | |
actor { | |
provider = "akka.cluster.ClusterActorRefProvider" | |
deployment{ | |
/JobDispatcherActor/router = { | |
router = adaptive-group | |
metrics-selector = mix | |
nr-of-instances = 100 | |
routees.paths = ["/user/JobDispatcherActor"] | |
cluster { | |
enabled = on | |
use-role = backend | |
allow-local-routees = off | |
} | |
} | |
} | |
} | |
remote { | |
enabled-transports = ["akka.remote.netty.tcp"] | |
netty.tcp { | |
hostname = "127.0.0.1" | |
} | |
log-sent-messages = on | |
log-received-messages = on | |
log-remote-lifecycle-events = on | |
} | |
cluster { | |
seed-nodes = [ | |
"akka.tcp://OASIS-ET-cluster@127.0.0.1:2553" | |
] | |
auto-down-unreachable-after = 10s | |
min-nr-of-members = 1 | |
} | |
role{ | |
ProxyActor.min-nr-of-members = 1 | |
JobDispatcherActor.min-nr-of-members = 1 | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.empowerops.grid2 | |
import com.empowerops.algorithms.SymbolTable | |
import com.empowerops.common.ModelEvent | |
import com.empowerops.front_end.model.ExternalToolProxyNode | |
sealed trait AKKAMessage | |
sealed trait AKKAExecuteResponseMessage extends AKKAMessage | |
case class ModelEventDTO(event: ModelEvent) | |
extends AKKAMessage | |
// class OptionDTO(option: String, arg: String) | |
// case class ExecuteRequestDTO(executable: Path, options: Set[OptionDTO], inputFile: Path, outputFile: Path, timeout: time.Duration, inputsByName: Map[String, Double]) | |
case class ExecuteRequestDTO(node: ExternalToolProxyNode, symbolTable: SymbolTable) | |
extends AKKAMessage | |
case class ExecuteResponseDTO(symbolTable: SymbolTable) | |
extends AKKAExecuteResponseMessage | |
case class ExecuteErrorDTO(exception: Throwable) | |
extends AKKAExecuteResponseMessage |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.empowerops.grid2; | |
import com.empowerops.algorithms.SymbolTable; | |
import com.empowerops.common.ModelEvent; | |
import com.empowerops.common.assists.CommonFixtureBase; | |
import com.empowerops.common.assists.Count; | |
import com.empowerops.common.assists.CountingEventBus; | |
import com.empowerops.common.documentation.ReflectionSensitive; | |
import com.empowerops.common.model.VariableSymbol; | |
import com.empowerops.front_end.execution.ExternalToolExecutorJob; | |
import com.empowerops.front_end.model.ExternalToolProxyNode; | |
import com.empowerops.linqalike.Queryable; | |
import com.google.inject.Module; | |
import org.junit.After; | |
import org.junit.Before; | |
import org.junit.Test; | |
import static com.empowerops.assists.Exceptions.assertThrows; | |
import static com.empowerops.common.BootstrappingUtilities.getParamName; | |
import static com.empowerops.common.BootstrappingUtilities.popProperty; | |
import static com.empowerops.common.BootstrappingUtilities.pushProperty; | |
import static com.empowerops.common.model.VariableSymbolBuilder.*; | |
import static com.empowerops.linqalike.common.Tuple.pair; | |
import static org.assertj.core.api.Assertions.assertThat; | |
import static org.mockito.Matchers.eq; | |
import static org.mockito.Mockito.*; | |
/** | |
* Please note this test system tests a number of classes, namely: | |
* <ul> | |
* <li>{@link com.empowerops.grid2.Host}</li> | |
* <li>{@link com.empowerops.grid2.ProxyActor}</li> | |
* <li>{@link com.empowerops.grid2.JobDispatcherActor}</li> | |
* </ul> | |
* | |
* Created by Geoff on 2015-09-11. | |
*/ | |
public class ExternalToolActorSystemAcceptanceTests extends CommonFixtureBase{ | |
private static final VariableSymbol Y1 = objective("Y1").build(); | |
private static final Queryable<VariableSymbol> inputs = inputs("in1", "in2", "in3").build(); | |
private static final Class<ProxyActor> PAClass = ProxyActor.class; | |
private static final Class<Host> HostClass = Host.class; | |
private ExternalToolProxyNode node; | |
private SymbolTable runningTable; | |
private static @ReflectionSensitive CountingEventBus eventBusForTest; | |
private static @ReflectionSensitive Module moduleForTest; | |
private ExternalToolExecutorJob.Factory factory; | |
@Before | |
public void setup_node_and_some_simple_vars() { | |
node = new ExternalToolProxyNode(); | |
node.getProducedVariables().add(Y1); | |
node.getConsumedVariables().addAll(inputs); | |
runningTable = new SymbolTable(); | |
runningTable.addAll(pair(inputs.first(), 1.0), pair(inputs.second(), 2.0), pair(inputs.last(), 3.0)); | |
} | |
@Before | |
public void setup_akka_config() { | |
factory = mock(ExternalToolExecutorJob.Factory.class); | |
moduleForTest = binder -> binder.bind(ExternalToolExecutorJob.Factory.class).toInstance(factory); | |
eventBusForTest = new CountingEventBus(); | |
pushProperty(getParamName(PAClass, "module"), "com.empowerops.grid2.ExternalToolActorSystemAcceptanceTests.moduleForTest"); | |
pushProperty(getParamName(PAClass, "eventBus"), "com.empowerops.grid2.ExternalToolActorSystemAcceptanceTests.eventBusForTest"); | |
// pushProperty(getParamName(PAClass, "ConfigComPath"), "/com/empowerops/grid2/base.conf"); | |
// pushProperty(getParamName(HostClass, "ConfigComPath"), "/com/empowerops/grid2/base.conf"); | |
} | |
@After | |
public void remove_akka_config_from_jvm() { | |
// popProperty(getParamName(HostClass, "ConfigComPath")); | |
// popProperty(getParamName(PAClass, "ConfigComPath")); | |
popProperty(getParamName(PAClass, "eventBus")); | |
popProperty(getParamName(PAClass, "module")); | |
} | |
@Test | |
public void when_dispatching_a_valid_job_should_create_and_run_ET_execution_job() { | |
//setup | |
ExternalToolExecutorJob job = mock(ExternalToolExecutorJob.class); | |
when(factory.create(eq(node), eq(runningTable))).thenReturn(job); | |
doAnswer(invoc -> runningTable.put(Y1, 42.0)).when(job).run(); | |
Host host = new Host(eventBus, resourceEnvironmentFactory); | |
//act | |
SymbolTable result = host.execute(node, runningTable); | |
//assert | |
verify(factory, once()).create(eq(node), eq(runningTable)); | |
verify(job, once()).run(); | |
assertThat(result.getValueFor(node.getProducedVariables().single())).isEqualTo(42.0); | |
} | |
@Test | |
@SuppressWarnings("ThrowableResultOfMethodCallIgnored") // part of the test | |
public void when_dispatching_a_job_that_throws_exception_should_be_exposed_to_caller(){ | |
//setup | |
ExternalToolExecutorJob job = mock(ExternalToolExecutorJob.class); | |
when(factory.create(eq(node), eq(runningTable))).thenReturn(job); | |
doThrow(new TestException()).when(job).run(); | |
Host host = new Host(eventBus, resourceEnvironmentFactory); | |
//act & a little assert | |
assertThrows(TestException.class, () -> host.execute(node, runningTable)); | |
//more assert | |
verify(factory, once()).create(eq(node), eq(runningTable)); | |
verify(job, once()).run(); | |
} | |
public static class TestException extends RuntimeException{ | |
private static final long serialVersionUID = 4185502332377683589L; | |
} | |
@Test | |
public void when_dispatching_a_job_that_posts_an_event_we_should_recieve_that_event(){ | |
//setup | |
ExternalToolExecutorJob job = mock(ExternalToolExecutorJob.class); | |
when(factory.create(eq(node), eq(runningTable))).thenReturn(job); | |
doAnswer(invoc -> { eventBusForTest.post(new NeatoEvent()); return null; }).when(job).run(); | |
Host host = new Host(eventBus, resourceEnvironmentFactory); | |
//act | |
host.execute(node, runningTable); | |
//assert | |
verify(factory, once()).create(node, runningTable); | |
verify(job, once()).run(); | |
eventBus.shouldHaveBeenAskedToPost(Count.Once, NeatoEvent.class); | |
} | |
public static class NeatoEvent implements ModelEvent {} | |
@Test | |
public void when_posting_an_event_locally_that_event_makes_it_to_all_actors(){ | |
//setup | |
when(factory.create(eq(node), eq(runningTable))).thenReturn(mock(ExternalToolExecutorJob.class)); | |
Host host = new Host(eventBus, resourceEnvironmentFactory); | |
//act | |
host.execute(node, runningTable); | |
eventBus.post(new NeatoEvent()); | |
//more assert | |
eventBusForTest.shouldHaveBeenAskedToPost(Count.Once, NeatoEvent.class); | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
include "cluster" | |
akka{ | |
remote.netty.tcp.port=0 | |
cluster.roles = [ | |
JobDispatcherActor | |
] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.empowerops.grid2 | |
import java.util.concurrent.ArrayBlockingQueue | |
import akka.actor.{ActorRef, ActorSystem, Props} | |
import com.empowerops.algorithms.SymbolTable | |
import com.empowerops.common.persistence.ResourceEnvironment | |
import com.empowerops.common.{BootstrappingUtilities, EventBus} | |
import com.empowerops.front_end.model.ExternalToolProxyNode | |
import com.google.inject.Inject | |
import com.typesafe.config.ConfigFactory | |
/** | |
* Created by Geoff on 2015-09-12. | |
*/ | |
class Host @Inject() (eventBus: EventBus, envFactory: ResourceEnvironment.Factory){ | |
//create a tool to make the call synchronous. | |
val completionSignal = new ArrayBlockingQueue[AKKAExecuteResponseMessage](1) | |
// Create an Akka system | |
val env = envFactory.forServiceWithoutView(classOf[Host]) | |
val config = ConfigFactory.parseFile(env.loadResourceAt(Host.ConfigComPath).asFile) | |
val system = ActorSystem("OASIS-ET-cluster", config) | |
//add that dispatcher to the system | |
var dispatcherActor : ActorRef = null | |
dispatcherActor = system.actorOf( | |
props = Props(new JobDispatcherActor(eventBus, completionSignal)), | |
name = "JobDispatcherActor" | |
) | |
def execute(node: ExternalToolProxyNode, symbolTable: SymbolTable): SymbolTable ={ | |
// start the calculation | |
dispatcherActor ! new ExecuteRequestDTO(node, symbolTable) | |
//wait for the job to complete | |
// val result = completionSignal.poll(5, TimeUnit.SECONDS) | |
val result = completionSignal.take() | |
//return the new symbol table | |
result match{ | |
case result: ExecuteResponseDTO => result.symbolTable | |
case result: ExecuteErrorDTO => throw result.exception | |
case null => throw new IllegalStateException | |
} | |
} | |
} | |
object Host{ | |
val ConfigComPath = BootstrappingUtilities.getEnvString(classOf[Host], "ConfigComPath").orElse("/com/empowerops/grid2/Host.conf"); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.empowerops.grid2 | |
import java.util | |
import akka.actor.{ReceiveTimeout, Actor, ActorLogging} | |
import akka.cluster.Cluster | |
import akka.routing.FromConfig | |
import com.empowerops.common.{EventBus, ModelEvent} | |
import com.google.common.eventbus.Subscribe | |
import scala.collection.immutable.HashSet | |
/** | |
* Created by Geoff on 2015-09-12. | |
*/ | |
class JobDispatcherActor(eventBus: EventBus, | |
signal: util.Queue[AKKAExecuteResponseMessage]) | |
extends Actor with ActorLogging { | |
val backend = context.actorOf( | |
FromConfig.props(), | |
name = "router" | |
) | |
var syndicatedEvents = new HashSet[ModelEvent] | |
eventBus.register(this) | |
Cluster(context.system) registerOnMemberUp { | |
backend ! new ModelEventDTO(null) | |
} | |
@Subscribe def pumpEventToActorsOn(event: ModelEvent): Unit ={ | |
if ( ! syndicatedEvents.contains(event)) { | |
backend ! new ModelEventDTO(event) | |
syndicatedEvents += event | |
} | |
else{ | |
syndicatedEvents -= event | |
} | |
} | |
def pumpEventToLocalMachine(event: ModelEvent): Unit = { | |
if ( ! syndicatedEvents.contains (event)) { | |
eventBus.post (event) | |
syndicatedEvents += event | |
} | |
else { | |
syndicatedEvents -= event | |
} | |
} | |
override def receive = { | |
case ReceiveTimeout => | |
log.info("Timeout"); | |
case message : ExecuteRequestDTO => | |
log.info("received execute request") | |
backend ! message | |
case ModelEventDTO(event) => | |
log.info("recieved model event") | |
pumpEventToLocalMachine(event) | |
case message : AKKAExecuteResponseMessage => | |
log.info("recieved execution response") | |
signal.offer(message) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
include "cluster" | |
akka { | |
remote.netty.tcp.port = 2553 | |
cluster.roles = [ | |
ProxyActor | |
] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.empowerops.grid2 | |
import akka.actor._ | |
import com.empowerops.common.BootstrappingUtilities.{getEnv, getEnvString} | |
import com.empowerops.common.persistence.ResourceEnvironment.Factory | |
import com.empowerops.common.{AbstractModule, EventBus, ModelEvent} | |
import com.empowerops.front_end.execution.{ExternalToolExecutor, ExternalToolExecutorJob, ProcessInfoCollector, ProcessMonitor} | |
import com.empowerops.grid2.ProxyActor.DefaultEventBus | |
import com.google.common.eventbus.Subscribe | |
import com.google.inject.{Guice, Module} | |
import com.typesafe.config.ConfigFactory | |
import net.codingwell.scalaguice.InjectorExtensions._ | |
import net.codingwell.scalaguice.ScalaModule | |
class ProxyActor extends Actor with ActorLogging { | |
val eventBus = getEnv(classOf[ProxyActor], "eventBus", classOf[EventBus]) .orElse(DefaultEventBus) | |
val module = getEnv(classOf[ProxyActor], "module", classOf[Module]) .orElse(RunningModule) | |
this.eventBus.register(this) | |
object RunningModule extends AbstractModule with ScalaModule { | |
override def configure() { | |
bind[EventBus].toInstance(eventBus) | |
bindFactory(classOf[ExternalToolExecutor.Factory]) | |
bindFactory(classOf[ExternalToolExecutorJob.Factory]) | |
bindFactory(classOf[ProcessMonitor.Factory]) | |
bindFactory(classOf[ProcessInfoCollector.Factory]) | |
} | |
} | |
@Subscribe def onLocalEvent(event: ModelEvent): Unit ={ | |
sender ! new ModelEventDTO(event) | |
} | |
// TODO: this should be pumping events identically to the ETJobDispatcher actor | |
// I need a test to fail this configuration, but my heads spinning figuring out how to manage that... | |
override def receive: Receive = { | |
case ExecuteRequestDTO(node, table) => | |
try { | |
val injector = Guice.createInjector(module) | |
val jobFactory = injector.instance[ExternalToolExecutorJob.Factory] | |
val job = jobFactory.create(node, table) | |
job.run() | |
sender ! new ExecuteResponseDTO(table) | |
} | |
catch{ | |
case exception: Throwable => | |
log.info("saw exception attempting to evaluate " + table) | |
log.info("aforementioned exception: {}", exception) | |
sender ! new ExecuteErrorDTO(exception) | |
} | |
case ModelEventDTO(event) => | |
log.info("received model event: " + event) | |
eventBus.post(event) | |
case _ => | |
throw new UnsupportedOperationException() | |
} | |
} | |
object ProxyActor{ | |
def props(): Props = Props(classOf[ProxyActor]) | |
val DefaultEventBus = new EventBus("ET-Task") | |
val SystemName: String = "OASIS-ET-cluster" | |
val ConfigComPath = getEnvString(classOf[ProxyActor], "ConfigComPath").orElse("/com/empowerops/grid2/ProxyActor.conf") | |
def main(args: Array[String]) : Unit = { | |
//get the configuration file from classpath | |
val env = new Factory().forServiceWithoutView(classOf[ProxyActor]) | |
val config = ConfigFactory.parseFile(env.loadResourceAt(ConfigComPath).asFile()) | |
//create an actor system with that config | |
val system = ActorSystem(SystemName , config) | |
//create a remote actor from actorSystem | |
val remote = system.actorOf(Props[ProxyActor], name = "ProxyActor") | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment