Skip to content

Instantly share code, notes, and snippets.

@Groostav
Last active September 20, 2015 20:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Groostav/a7660234c2059dabce35 to your computer and use it in GitHub Desktop.
Save Groostav/a7660234c2059dabce35 to your computer and use it in GitHub Desktop.
First crack at Akka and Scala! Please critique!
package com.empowerops.common;
import java.io.ByteArrayInputStream;
public class AKKAXStreamXMLSerializer extends akka.serialization.JSerializer {
//TODO replace with guice injection
private final Serializer serializer = new Serializer(
SerializationConfiguration.Factory.Default,
new Serializer.CustomConverterPackage(null, null),
new OASISSnakeYAMLConstructor(),
new OASISSnakeYAMLRepresenter()
);
@Override public Object fromBinaryJava(byte[] bytes, Class<?> manifest) {
return serializer.getXMLSerializer().fromXML(new ByteArrayInputStream(bytes));
}
@Override public int identifier() {
return 872391;
}
@Override public byte[] toBinary(Object o) {
return serializer.toXML(o).getBytes();
}
@Override public boolean includeManifest() {
return false;
}
}
include "base"
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
}
log-sent-messages = on
log-received-messages = on
}
}
akka {
loglevel = "INFO"
actor {
serializers {
xstream = "com.empowerops.common.AKKAXStreamXMLSerializer"
}
serialization-bindings {
"java.lang.Object" = xstream
"java.io.Serializable" = none
}
}
}
package com.empowerops.grid2
import com.empowerops.algorithms.SymbolTable
import com.empowerops.common.ModelEvent
import com.empowerops.front_end.model.ExternalToolProxyNode
sealed trait AKKAMessage
sealed trait AKKAExecuteResponseMessage extends AKKAMessage
case class ModelEventDTO(event: ModelEvent)
extends AKKAMessage
// class OptionDTO(option: String, arg: String) //no no I can use a scala tuple! :D now to figure out tuple syntax...
// case class ExecuteRequestDTO(executable: Path, options: Set[OptionDTO], inputFile: Path, outputFile: Path, timeout: time.Duration, inputsByName: Map[String, Double])
case class ExecuteRequestDTO(node: ExternalToolProxyNode, symbolTable: SymbolTable)
extends AKKAMessage
case class ExecuteResponseDTO(symbolTable: SymbolTable)
extends AKKAExecuteResponseMessage
case class ExecuteErrorDTO(exception: Throwable)
extends AKKAExecuteResponseMessage
include "base-remote"
akka {
remote {
netty.tcp {
port = 5150
}
}
}
package com.empowerops.grid2;
import com.empowerops.algorithms.SymbolTable;
import com.empowerops.common.ModelEvent;
import com.empowerops.common.assists.CommonFixtureBase;
import com.empowerops.common.assists.Count;
import com.empowerops.common.assists.CountingEventBus;
import com.empowerops.common.documentation.ReflectionSensitive;
import com.empowerops.common.model.VariableSymbol;
import com.empowerops.front_end.execution.ExternalToolExecutorJob;
import com.empowerops.front_end.model.ExternalToolProxyNode;
import com.empowerops.linqalike.Queryable;
import com.google.inject.Module;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static com.empowerops.assists.Exceptions.assertThrows;
import static com.empowerops.common.BootstrappingUtilities.getParamName;
import static com.empowerops.common.BootstrappingUtilities.popProperty;
import static com.empowerops.common.BootstrappingUtilities.pushProperty;
import static com.empowerops.common.model.VariableSymbolBuilder.*;
import static com.empowerops.linqalike.common.Tuple.pair;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.*;
/**
* Please note this test system tests a number of classes, namely:
* <ul>
* <li>{@link com.empowerops.grid2.Host}</li>
* <li>{@link com.empowerops.grid2.ProxyActor}</li>
* <li>{@link com.empowerops.grid2.JobDispatcherActor}</li>
* </ul>
*
* Created by Geoff on 2015-09-11.
*/
public class ExternalToolActorSystemAcceptanceTests extends CommonFixtureBase{
private static final VariableSymbol Y1 = objective("Y1").build();
private static final Queryable<VariableSymbol> inputs = inputs("in1", "in2", "in3").build();
private static final Class<ProxyActor> PAClass = ProxyActor.class;
private static final Class<Host> HostClass = Host.class;
private ExternalToolProxyNode node;
private SymbolTable runningTable;
private static @ReflectionSensitive CountingEventBus eventBusForTest;
private static @ReflectionSensitive Module moduleForTest;
private ExternalToolExecutorJob.Factory factory;
@Before
public void setup_node_and_some_simple_vars() {
node = new ExternalToolProxyNode();
node.getProducedVariables().add(Y1);
node.getConsumedVariables().addAll(inputs);
runningTable = new SymbolTable();
runningTable.addAll(pair(inputs.first(), 1.0), pair(inputs.second(), 2.0), pair(inputs.last(), 3.0));
}
@Before
public void setup_akka_config() {
factory = mock(ExternalToolExecutorJob.Factory.class);
moduleForTest = binder -> binder.bind(ExternalToolExecutorJob.Factory.class).toInstance(factory);
eventBusForTest = new CountingEventBus();
pushProperty(getParamName(PAClass, "module"), "com.empowerops.grid2.ExternalToolProxyTaskFixture.moduleForTest");
pushProperty(getParamName(PAClass, "eventBus"), "com.empowerops.grid2.ExternalToolProxyTaskFixture.eventBusForTest");
pushProperty(getParamName(PAClass, "ConfigComPath"), "/com/empowerops/grid2/base.conf");
pushProperty(getParamName(HostClass, "ConfigComPath"), "/com/empowerops/grid2/base.conf");
}
@After
public void remove_akka_config_from_jvm() {
popProperty(getParamName(HostClass, "ConfigComPath"));
popProperty(getParamName(PAClass, "ConfigComPath"));
popProperty(getParamName(PAClass, "eventBus"));
popProperty(getParamName(PAClass, "module"));
}
@Test
public void when_dispatching_a_valid_job_should_create_and_run_ET_execution_job() {
//setup
ExternalToolExecutorJob job = mock(ExternalToolExecutorJob.class);
when(factory.create(eq(node), eq(runningTable))).thenReturn(job);
doAnswer(invoc -> runningTable.put(Y1, 42.0)).when(job).run();
Host host = new Host(eventBus, resourceEnvironmentFactory);
//act
SymbolTable result = host.execute(node, runningTable);
//assert
verify(factory, once()).create(eq(node), eq(runningTable));
verify(job, once()).run();
assertThat(result.getValueFor(node.getProducedVariables().single())).isEqualTo(42.0);
}
@Test
@SuppressWarnings("ThrowableResultOfMethodCallIgnored") // part of the test
public void when_dispatching_a_job_that_throws_exception_should_be_exposed_to_caller(){
//setup
ExternalToolExecutorJob job = mock(ExternalToolExecutorJob.class);
when(factory.create(eq(node), eq(runningTable))).thenReturn(job);
doThrow(new TestException()).when(job).run();
Host host = new Host(eventBus, resourceEnvironmentFactory);
//act & a little assert
assertThrows(TestException.class, () -> host.execute(node, runningTable));
//more assert
verify(factory, once()).create(eq(node), eq(runningTable));
verify(job, once()).run();
}
public static class TestException extends RuntimeException{
private static final long serialVersionUID = 4185502332377683589L;
}
@Test
public void when_dispatching_a_job_that_posts_an_event_we_should_recieve_that_event(){
//setup
ExternalToolExecutorJob job = mock(ExternalToolExecutorJob.class);
when(factory.create(eq(node), eq(runningTable))).thenReturn(job);
doAnswer(invoc -> { eventBusForTest.post(new NeatoEvent()); return null; }).when(job).run();
Host host = new Host(eventBus, resourceEnvironmentFactory);
//act
host.execute(node, runningTable);
//assert
verify(factory, once()).create(node, runningTable);
verify(job, once()).run();
eventBus.shouldHaveBeenAskedToPost(Count.Once, NeatoEvent.class);
}
public static class NeatoEvent implements ModelEvent {}
@Test
public void when_posting_an_event_locally_that_event_makes_it_to_all_actors(){
//setup
when(factory.create(eq(node), eq(runningTable))).thenReturn(mock(ExternalToolExecutorJob.class));
Host host = new Host(eventBus, resourceEnvironmentFactory);
//act
host.execute(node, runningTable);
eventBus.post(new NeatoEvent());
//assert
eventBusForTest.shouldHaveBeenAskedToPost(Count.Once, NeatoEvent.class);
}
}
include "base-remote"
akka {
remote {
netty.tcp {
port = 0
}
}
}
package com.empowerops.grid2
import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
import akka.actor.{ActorSystem, Props}
import com.empowerops.algorithms.SymbolTable
import com.empowerops.common.persistence.ResourceEnvironment
import com.empowerops.common.persistence.ResourceEnvironment.Factory
import com.empowerops.common.{BootstrappingUtilities, EventBus}
import com.empowerops.front_end.model.ExternalToolProxyNode
import com.typesafe.config.ConfigFactory
/**
* Created by Geoff on 2015-09-12.
*/
class Host(eventBus: EventBus, envFactory: ResourceEnvironment.Factory){
// Create an Akka system
val env = envFactory.forServiceWithoutView(classOf[Host])
val config = ConfigFactory.parseFile(env.loadResourceAt(Host.ConfigComPath).asFile)
val system = ActorSystem("OASIS-ET-Dispatcher-Side", config)
//create a tool to make the call synchronous.
val completionSignal = new ArrayBlockingQueue[AKKAExecuteResponseMessage](1)
//add that dispatcher to the system
val dispatcherActor = system.actorOf(
props = Props(new JobDispatcherActor(eventBus, completionSignal)),
name = "Dispatcher"
)
def execute(node: ExternalToolProxyNode, symbolTable: SymbolTable): SymbolTable ={
// start the calculation
dispatcherActor ! new ExecuteRequestDTO(node, symbolTable)
//wait for the job to complete
val result = completionSignal.poll(5, TimeUnit.SECONDS)
//return the new symbol table
result match{
case result: ExecuteResponseDTO => result.symbolTable
case result: ExecuteErrorDTO => throw result.exception
case null => throw new IllegalStateException
}
}
}
object Host{
val ConfigComPath = BootstrappingUtilities.getEnvString(classOf[Host], "ConfigComPath").orElse("/com/empowerops/grid2/Host-remote.conf");
def main (args: Array[String]) {
val host = new Host(new EventBus("Testing Host"), new Factory)
host.execute(new ExternalToolProxyNode, new SymbolTable)
}
}
package com.empowerops.grid2
import java.util
import akka.actor.Actor
import akka.routing.RoundRobinRouter
import com.empowerops.common.{EventBus, ModelEvent}
import com.empowerops.front_end.execution.ExternalToolProxyTask
import com.google.common.eventbus.Subscribe
import scala.collection.immutable.HashSet
/**
* Created by Geoff on 2015-09-12.
*/
class JobDispatcherActor(eventBus: EventBus,
signal: util.Queue[AKKAExecuteResponseMessage])
extends Actor {
val workerActor = context.actorSelection("akka.tcp://" + ProxyActor.SystemName + "@127.0.0.1:5150/user/" + ProxyActor.ActorName)
val workerRouter = context.actorOf(
ProxyActor.props().withRouter(RoundRobinRouter(1)),
name = "OASIS-ET-network-dispatcher"
)
var syndicatedEvents = new HashSet[ModelEvent]
eventBus.register(this)
@Subscribe def pumpEventToActorsOn(event: ModelEvent): Unit ={
if ( ! syndicatedEvents.contains(event)) {
workerActor ! new ModelEventDTO(event)
syndicatedEvents += event
}
else{
syndicatedEvents -= event
}
}
def pumpEventToLocalMachine(event: ModelEvent): Unit = {
if ( ! syndicatedEvents.contains (event)) {
eventBus.post (event)
syndicatedEvents += event
}
else {
syndicatedEvents -= event
}
}
override def receive = {
case message : ExecuteRequestDTO =>
workerActor ! message
case ModelEventDTO(event) =>
pumpEventToLocalMachine(event)
case message : AKKAExecuteResponseMessage =>
signal.offer(message)
}
}
package com.empowerops.grid2
import java.util.logging.{Level, Logger}
import akka.actor._
import com.empowerops.common.BootstrappingUtilities.{getEnv, getEnvString}
import com.empowerops.common.persistence.ResourceEnvironment.Factory
import com.empowerops.common.{AbstractModule, EventBus, ModelEvent}
import com.empowerops.front_end.execution.{ExternalToolExecutor, ExternalToolExecutorJob, ProcessInfoCollector, ProcessMonitor}
import com.empowerops.grid2.ProxyActor.DefaultEventBus
import com.google.common.eventbus.Subscribe
import com.google.inject.{Guice, Module}
import com.typesafe.config.ConfigFactory
import net.codingwell.scalaguice.InjectorExtensions._
import net.codingwell.scalaguice.ScalaModule
class ProxyActor extends Actor {
val log = Logger.getLogger(classOf[ProxyActor].getCanonicalName)
val eventBus = getEnv(classOf[ProxyActor], "eventBus", classOf[EventBus]) .orElse(DefaultEventBus)
val module = getEnv(classOf[ProxyActor], "module", classOf[Module]) .orElse(RunningModule)
this.eventBus.register(this)
object RunningModule extends AbstractModule with ScalaModule {
override def configure() {
bind[EventBus].toInstance(eventBus)
bindFactory(classOf[ExternalToolExecutor.Factory])
bindFactory(classOf[ExternalToolExecutorJob.Factory])
bindFactory(classOf[ProcessMonitor.Factory])
bindFactory(classOf[ProcessInfoCollector.Factory])
}
}
@Subscribe def onLocalEvent(event: ModelEvent): Unit ={
sender ! new ModelEventDTO(event)
}
// TODO: this should be pumping events identically to the ETJobDispatcher actor
// I need a test to fail this configuration, but my heads spinning figuring out how to manage that...
override def receive: Receive = {
case ExecuteRequestDTO(node, table) =>
try {
val injector = Guice.createInjector(module)
val jobFactory = injector.instance[ExternalToolExecutorJob.Factory]
val job = jobFactory.create(node, table)
job.run()
sender ! new ExecuteResponseDTO(table)
}
catch{
case exception: Throwable =>
log.info("saw exception attempting to evaluate " + table)
log.log(Level.FINE, "aforementioned exception", exception)
sender ! new ExecuteErrorDTO(exception)
}
case ModelEventDTO(event) =>
eventBus.post(event)
case _ =>
throw new UnsupportedOperationException()
}
}
object ProxyActor{
def props(): Props = Props(classOf[ProxyActor])
val DefaultEventBus = new EventBus("ET-Task")
val ActorName = "ET-Proxy"
val SystemName: String = "OASIS-ET-Proxy-Side"
val ConfigComPath = getEnvString(classOf[ProxyActor], "ConfigComPath").orElse("/com/empowerops/grid2/ET-remote.conf")
val thing = 4
def main(args: Array[String]) : Unit = {
//get the configuration file from classpath
val env = new Factory().forServiceWithoutView(classOf[ProxyActor])
val config = ConfigFactory.parseFile(env.loadResourceAt(ConfigComPath).asFile())
//create an actor system with that config
val system = ActorSystem(SystemName , config)
//create a remote actor from actorSystem
val remote = system.actorOf(Props[ProxyActor], name = ActorName)
println("remote is ready")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment