Last active
September 20, 2015 20:51
-
-
Save Groostav/a7660234c2059dabce35 to your computer and use it in GitHub Desktop.
First crack at Akka and Scala! Please critique!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.empowerops.common; | |
import java.io.ByteArrayInputStream; | |
public class AKKAXStreamXMLSerializer extends akka.serialization.JSerializer { | |
//TODO replace with guice injection | |
private final Serializer serializer = new Serializer( | |
SerializationConfiguration.Factory.Default, | |
new Serializer.CustomConverterPackage(null, null), | |
new OASISSnakeYAMLConstructor(), | |
new OASISSnakeYAMLRepresenter() | |
); | |
@Override public Object fromBinaryJava(byte[] bytes, Class<?> manifest) { | |
return serializer.getXMLSerializer().fromXML(new ByteArrayInputStream(bytes)); | |
} | |
@Override public int identifier() { | |
return 872391; | |
} | |
@Override public byte[] toBinary(Object o) { | |
return serializer.toXML(o).getBytes(); | |
} | |
@Override public boolean includeManifest() { | |
return false; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
include "base" | |
akka { | |
actor { | |
provider = "akka.remote.RemoteActorRefProvider" | |
} | |
remote { | |
enabled-transports = ["akka.remote.netty.tcp"] | |
netty.tcp { | |
hostname = "127.0.0.1" | |
} | |
log-sent-messages = on | |
log-received-messages = on | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
akka { | |
loglevel = "INFO" | |
actor { | |
serializers { | |
xstream = "com.empowerops.common.AKKAXStreamXMLSerializer" | |
} | |
serialization-bindings { | |
"java.lang.Object" = xstream | |
"java.io.Serializable" = none | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.empowerops.grid2 | |
import com.empowerops.algorithms.SymbolTable | |
import com.empowerops.common.ModelEvent | |
import com.empowerops.front_end.model.ExternalToolProxyNode | |
sealed trait AKKAMessage | |
sealed trait AKKAExecuteResponseMessage extends AKKAMessage | |
case class ModelEventDTO(event: ModelEvent) | |
extends AKKAMessage | |
// class OptionDTO(option: String, arg: String) //no no I can use a scala tuple! :D now to figure out tuple syntax... | |
// case class ExecuteRequestDTO(executable: Path, options: Set[OptionDTO], inputFile: Path, outputFile: Path, timeout: time.Duration, inputsByName: Map[String, Double]) | |
case class ExecuteRequestDTO(node: ExternalToolProxyNode, symbolTable: SymbolTable) | |
extends AKKAMessage | |
case class ExecuteResponseDTO(symbolTable: SymbolTable) | |
extends AKKAExecuteResponseMessage | |
case class ExecuteErrorDTO(exception: Throwable) | |
extends AKKAExecuteResponseMessage |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
include "base-remote" | |
akka { | |
remote { | |
netty.tcp { | |
port = 5150 | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.empowerops.grid2; | |
import com.empowerops.algorithms.SymbolTable; | |
import com.empowerops.common.ModelEvent; | |
import com.empowerops.common.assists.CommonFixtureBase; | |
import com.empowerops.common.assists.Count; | |
import com.empowerops.common.assists.CountingEventBus; | |
import com.empowerops.common.documentation.ReflectionSensitive; | |
import com.empowerops.common.model.VariableSymbol; | |
import com.empowerops.front_end.execution.ExternalToolExecutorJob; | |
import com.empowerops.front_end.model.ExternalToolProxyNode; | |
import com.empowerops.linqalike.Queryable; | |
import com.google.inject.Module; | |
import org.junit.After; | |
import org.junit.Before; | |
import org.junit.Test; | |
import static com.empowerops.assists.Exceptions.assertThrows; | |
import static com.empowerops.common.BootstrappingUtilities.getParamName; | |
import static com.empowerops.common.BootstrappingUtilities.popProperty; | |
import static com.empowerops.common.BootstrappingUtilities.pushProperty; | |
import static com.empowerops.common.model.VariableSymbolBuilder.*; | |
import static com.empowerops.linqalike.common.Tuple.pair; | |
import static org.assertj.core.api.Assertions.assertThat; | |
import static org.mockito.Matchers.eq; | |
import static org.mockito.Mockito.*; | |
/** | |
* Please note this test system tests a number of classes, namely: | |
* <ul> | |
* <li>{@link com.empowerops.grid2.Host}</li> | |
* <li>{@link com.empowerops.grid2.ProxyActor}</li> | |
* <li>{@link com.empowerops.grid2.JobDispatcherActor}</li> | |
* </ul> | |
* | |
* Created by Geoff on 2015-09-11. | |
*/ | |
public class ExternalToolActorSystemAcceptanceTests extends CommonFixtureBase{ | |
private static final VariableSymbol Y1 = objective("Y1").build(); | |
private static final Queryable<VariableSymbol> inputs = inputs("in1", "in2", "in3").build(); | |
private static final Class<ProxyActor> PAClass = ProxyActor.class; | |
private static final Class<Host> HostClass = Host.class; | |
private ExternalToolProxyNode node; | |
private SymbolTable runningTable; | |
private static @ReflectionSensitive CountingEventBus eventBusForTest; | |
private static @ReflectionSensitive Module moduleForTest; | |
private ExternalToolExecutorJob.Factory factory; | |
@Before | |
public void setup_node_and_some_simple_vars() { | |
node = new ExternalToolProxyNode(); | |
node.getProducedVariables().add(Y1); | |
node.getConsumedVariables().addAll(inputs); | |
runningTable = new SymbolTable(); | |
runningTable.addAll(pair(inputs.first(), 1.0), pair(inputs.second(), 2.0), pair(inputs.last(), 3.0)); | |
} | |
@Before | |
public void setup_akka_config() { | |
factory = mock(ExternalToolExecutorJob.Factory.class); | |
moduleForTest = binder -> binder.bind(ExternalToolExecutorJob.Factory.class).toInstance(factory); | |
eventBusForTest = new CountingEventBus(); | |
pushProperty(getParamName(PAClass, "module"), "com.empowerops.grid2.ExternalToolProxyTaskFixture.moduleForTest"); | |
pushProperty(getParamName(PAClass, "eventBus"), "com.empowerops.grid2.ExternalToolProxyTaskFixture.eventBusForTest"); | |
pushProperty(getParamName(PAClass, "ConfigComPath"), "/com/empowerops/grid2/base.conf"); | |
pushProperty(getParamName(HostClass, "ConfigComPath"), "/com/empowerops/grid2/base.conf"); | |
} | |
@After | |
public void remove_akka_config_from_jvm() { | |
popProperty(getParamName(HostClass, "ConfigComPath")); | |
popProperty(getParamName(PAClass, "ConfigComPath")); | |
popProperty(getParamName(PAClass, "eventBus")); | |
popProperty(getParamName(PAClass, "module")); | |
} | |
@Test | |
public void when_dispatching_a_valid_job_should_create_and_run_ET_execution_job() { | |
//setup | |
ExternalToolExecutorJob job = mock(ExternalToolExecutorJob.class); | |
when(factory.create(eq(node), eq(runningTable))).thenReturn(job); | |
doAnswer(invoc -> runningTable.put(Y1, 42.0)).when(job).run(); | |
Host host = new Host(eventBus, resourceEnvironmentFactory); | |
//act | |
SymbolTable result = host.execute(node, runningTable); | |
//assert | |
verify(factory, once()).create(eq(node), eq(runningTable)); | |
verify(job, once()).run(); | |
assertThat(result.getValueFor(node.getProducedVariables().single())).isEqualTo(42.0); | |
} | |
@Test | |
@SuppressWarnings("ThrowableResultOfMethodCallIgnored") // part of the test | |
public void when_dispatching_a_job_that_throws_exception_should_be_exposed_to_caller(){ | |
//setup | |
ExternalToolExecutorJob job = mock(ExternalToolExecutorJob.class); | |
when(factory.create(eq(node), eq(runningTable))).thenReturn(job); | |
doThrow(new TestException()).when(job).run(); | |
Host host = new Host(eventBus, resourceEnvironmentFactory); | |
//act & a little assert | |
assertThrows(TestException.class, () -> host.execute(node, runningTable)); | |
//more assert | |
verify(factory, once()).create(eq(node), eq(runningTable)); | |
verify(job, once()).run(); | |
} | |
public static class TestException extends RuntimeException{ | |
private static final long serialVersionUID = 4185502332377683589L; | |
} | |
@Test | |
public void when_dispatching_a_job_that_posts_an_event_we_should_recieve_that_event(){ | |
//setup | |
ExternalToolExecutorJob job = mock(ExternalToolExecutorJob.class); | |
when(factory.create(eq(node), eq(runningTable))).thenReturn(job); | |
doAnswer(invoc -> { eventBusForTest.post(new NeatoEvent()); return null; }).when(job).run(); | |
Host host = new Host(eventBus, resourceEnvironmentFactory); | |
//act | |
host.execute(node, runningTable); | |
//assert | |
verify(factory, once()).create(node, runningTable); | |
verify(job, once()).run(); | |
eventBus.shouldHaveBeenAskedToPost(Count.Once, NeatoEvent.class); | |
} | |
public static class NeatoEvent implements ModelEvent {} | |
@Test | |
public void when_posting_an_event_locally_that_event_makes_it_to_all_actors(){ | |
//setup | |
when(factory.create(eq(node), eq(runningTable))).thenReturn(mock(ExternalToolExecutorJob.class)); | |
Host host = new Host(eventBus, resourceEnvironmentFactory); | |
//act | |
host.execute(node, runningTable); | |
eventBus.post(new NeatoEvent()); | |
//assert | |
eventBusForTest.shouldHaveBeenAskedToPost(Count.Once, NeatoEvent.class); | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
include "base-remote" | |
akka { | |
remote { | |
netty.tcp { | |
port = 0 | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.empowerops.grid2 | |
import java.util.concurrent.{ArrayBlockingQueue, TimeUnit} | |
import akka.actor.{ActorSystem, Props} | |
import com.empowerops.algorithms.SymbolTable | |
import com.empowerops.common.persistence.ResourceEnvironment | |
import com.empowerops.common.persistence.ResourceEnvironment.Factory | |
import com.empowerops.common.{BootstrappingUtilities, EventBus} | |
import com.empowerops.front_end.model.ExternalToolProxyNode | |
import com.typesafe.config.ConfigFactory | |
/** | |
* Created by Geoff on 2015-09-12. | |
*/ | |
class Host(eventBus: EventBus, envFactory: ResourceEnvironment.Factory){ | |
// Create an Akka system | |
val env = envFactory.forServiceWithoutView(classOf[Host]) | |
val config = ConfigFactory.parseFile(env.loadResourceAt(Host.ConfigComPath).asFile) | |
val system = ActorSystem("OASIS-ET-Dispatcher-Side", config) | |
//create a tool to make the call synchronous. | |
val completionSignal = new ArrayBlockingQueue[AKKAExecuteResponseMessage](1) | |
//add that dispatcher to the system | |
val dispatcherActor = system.actorOf( | |
props = Props(new JobDispatcherActor(eventBus, completionSignal)), | |
name = "Dispatcher" | |
) | |
def execute(node: ExternalToolProxyNode, symbolTable: SymbolTable): SymbolTable ={ | |
// start the calculation | |
dispatcherActor ! new ExecuteRequestDTO(node, symbolTable) | |
//wait for the job to complete | |
val result = completionSignal.poll(5, TimeUnit.SECONDS) | |
//return the new symbol table | |
result match{ | |
case result: ExecuteResponseDTO => result.symbolTable | |
case result: ExecuteErrorDTO => throw result.exception | |
case null => throw new IllegalStateException | |
} | |
} | |
} | |
object Host{ | |
val ConfigComPath = BootstrappingUtilities.getEnvString(classOf[Host], "ConfigComPath").orElse("/com/empowerops/grid2/Host-remote.conf"); | |
def main (args: Array[String]) { | |
val host = new Host(new EventBus("Testing Host"), new Factory) | |
host.execute(new ExternalToolProxyNode, new SymbolTable) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.empowerops.grid2 | |
import java.util | |
import akka.actor.Actor | |
import akka.routing.RoundRobinRouter | |
import com.empowerops.common.{EventBus, ModelEvent} | |
import com.empowerops.front_end.execution.ExternalToolProxyTask | |
import com.google.common.eventbus.Subscribe | |
import scala.collection.immutable.HashSet | |
/** | |
* Created by Geoff on 2015-09-12. | |
*/ | |
class JobDispatcherActor(eventBus: EventBus, | |
signal: util.Queue[AKKAExecuteResponseMessage]) | |
extends Actor { | |
val workerActor = context.actorSelection("akka.tcp://" + ProxyActor.SystemName + "@127.0.0.1:5150/user/" + ProxyActor.ActorName) | |
val workerRouter = context.actorOf( | |
ProxyActor.props().withRouter(RoundRobinRouter(1)), | |
name = "OASIS-ET-network-dispatcher" | |
) | |
var syndicatedEvents = new HashSet[ModelEvent] | |
eventBus.register(this) | |
@Subscribe def pumpEventToActorsOn(event: ModelEvent): Unit ={ | |
if ( ! syndicatedEvents.contains(event)) { | |
workerActor ! new ModelEventDTO(event) | |
syndicatedEvents += event | |
} | |
else{ | |
syndicatedEvents -= event | |
} | |
} | |
def pumpEventToLocalMachine(event: ModelEvent): Unit = { | |
if ( ! syndicatedEvents.contains (event)) { | |
eventBus.post (event) | |
syndicatedEvents += event | |
} | |
else { | |
syndicatedEvents -= event | |
} | |
} | |
override def receive = { | |
case message : ExecuteRequestDTO => | |
workerActor ! message | |
case ModelEventDTO(event) => | |
pumpEventToLocalMachine(event) | |
case message : AKKAExecuteResponseMessage => | |
signal.offer(message) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.empowerops.grid2 | |
import java.util.logging.{Level, Logger} | |
import akka.actor._ | |
import com.empowerops.common.BootstrappingUtilities.{getEnv, getEnvString} | |
import com.empowerops.common.persistence.ResourceEnvironment.Factory | |
import com.empowerops.common.{AbstractModule, EventBus, ModelEvent} | |
import com.empowerops.front_end.execution.{ExternalToolExecutor, ExternalToolExecutorJob, ProcessInfoCollector, ProcessMonitor} | |
import com.empowerops.grid2.ProxyActor.DefaultEventBus | |
import com.google.common.eventbus.Subscribe | |
import com.google.inject.{Guice, Module} | |
import com.typesafe.config.ConfigFactory | |
import net.codingwell.scalaguice.InjectorExtensions._ | |
import net.codingwell.scalaguice.ScalaModule | |
class ProxyActor extends Actor { | |
val log = Logger.getLogger(classOf[ProxyActor].getCanonicalName) | |
val eventBus = getEnv(classOf[ProxyActor], "eventBus", classOf[EventBus]) .orElse(DefaultEventBus) | |
val module = getEnv(classOf[ProxyActor], "module", classOf[Module]) .orElse(RunningModule) | |
this.eventBus.register(this) | |
object RunningModule extends AbstractModule with ScalaModule { | |
override def configure() { | |
bind[EventBus].toInstance(eventBus) | |
bindFactory(classOf[ExternalToolExecutor.Factory]) | |
bindFactory(classOf[ExternalToolExecutorJob.Factory]) | |
bindFactory(classOf[ProcessMonitor.Factory]) | |
bindFactory(classOf[ProcessInfoCollector.Factory]) | |
} | |
} | |
@Subscribe def onLocalEvent(event: ModelEvent): Unit ={ | |
sender ! new ModelEventDTO(event) | |
} | |
// TODO: this should be pumping events identically to the ETJobDispatcher actor | |
// I need a test to fail this configuration, but my heads spinning figuring out how to manage that... | |
override def receive: Receive = { | |
case ExecuteRequestDTO(node, table) => | |
try { | |
val injector = Guice.createInjector(module) | |
val jobFactory = injector.instance[ExternalToolExecutorJob.Factory] | |
val job = jobFactory.create(node, table) | |
job.run() | |
sender ! new ExecuteResponseDTO(table) | |
} | |
catch{ | |
case exception: Throwable => | |
log.info("saw exception attempting to evaluate " + table) | |
log.log(Level.FINE, "aforementioned exception", exception) | |
sender ! new ExecuteErrorDTO(exception) | |
} | |
case ModelEventDTO(event) => | |
eventBus.post(event) | |
case _ => | |
throw new UnsupportedOperationException() | |
} | |
} | |
object ProxyActor{ | |
def props(): Props = Props(classOf[ProxyActor]) | |
val DefaultEventBus = new EventBus("ET-Task") | |
val ActorName = "ET-Proxy" | |
val SystemName: String = "OASIS-ET-Proxy-Side" | |
val ConfigComPath = getEnvString(classOf[ProxyActor], "ConfigComPath").orElse("/com/empowerops/grid2/ET-remote.conf") | |
val thing = 4 | |
def main(args: Array[String]) : Unit = { | |
//get the configuration file from classpath | |
val env = new Factory().forServiceWithoutView(classOf[ProxyActor]) | |
val config = ConfigFactory.parseFile(env.loadResourceAt(ConfigComPath).asFile()) | |
//create an actor system with that config | |
val system = ActorSystem(SystemName , config) | |
//create a remote actor from actorSystem | |
val remote = system.actorOf(Props[ProxyActor], name = ActorName) | |
println("remote is ready") | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment