Created
June 28, 2013 21:26
-
-
Save doswell/5888270 to your computer and use it in GitHub Desktop.
Concurrent.patchPanel only forwards data from last added Enumerator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package controllers | |
import play.api._ | |
import play.api.mvc._ | |
import scala.concurrent.duration._ | |
import akka.actor.{Actor, Props} | |
import play.api.libs.concurrent.Akka | |
import play.api.libs.concurrent.Execution.Implicits._ | |
import controllers._ | |
import akka.pattern.ask | |
import akka.util.Timeout | |
import scala.concurrent._ | |
import play.api.libs.iteratee._ | |
import play.api.libs.concurrent._ | |
object Application extends Controller { | |
import play.api.Play.current | |
implicit val timeout = Timeout(1 seconds) | |
lazy val actorA = Akka.system.actorOf(Props(new TestActorGenerator("A")),name="actorA") | |
lazy val actorB = Akka.system.actorOf(Props(new TestActorGenerator("B")),name="actorB") | |
lazy val started = { | |
val consumeA = Akka.system.actorOf(Props(new TestActorConsumer("A",actorA)),name="consumeA") | |
val consumeB = Akka.system.actorOf(Props(new TestActorConsumer("B",actorB)),name="consumeB") | |
val patchPanel = Akka.system.actorOf(Props(new TestActorPanelConsumer("C",actorA,actorB)),name="patchPanel") | |
val consumeC = Akka.system.actorOf(Props(new TestActorConsumer("C",patchPanel)),name="consumeC") | |
Akka.system.scheduler.schedule(0 seconds, 1 seconds, actorA, SendMsg()) | |
Akka.system.scheduler.schedule(0 seconds, 1 seconds, actorB, SendMsg()) | |
Akka.system.scheduler.schedule(0 seconds, 20 seconds, patchPanel, Add(actorA)) | |
Akka.system.scheduler.schedule(10 seconds, 20 seconds, patchPanel, Add(actorB)) | |
patchPanel ! Add(actorA) | |
patchPanel ! Add(actorB) | |
true | |
} | |
// val consume = Iteratee.consume[String]{case s => println(s)} | |
def index = Action { | |
import play.api.Play.current | |
started | |
Ok(views.html.index("Your new application is ready.")) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package controllers | |
import scala.concurrent.duration._ | |
import scala.concurrent._ | |
import akka.actor.{Actor, Props, ActorRef} | |
import play.api.libs.concurrent.Akka | |
import play.api.libs.concurrent.Execution.Implicits._ | |
import play.api.libs.iteratee._ | |
import play.api.libs.concurrent._ | |
import akka.util.Timeout | |
import akka.pattern.ask | |
case class Join() | |
case class SendMsg() | |
case class Connected( | |
enumerator: Enumerator[String] | |
) | |
case class Ready() | |
object TestActor { | |
implicit val timeout = Timeout(1 second) | |
import play.api.Play.current | |
def join(actorName:String):Future[Enumerator[String]] = { | |
val joiningActor = Akka.system.actorFor("*/actor"+actorName) | |
(joiningActor ? Join()) map { | |
case Connected(enumerator) => | |
println("connectede") | |
enumerator | |
} | |
} | |
} | |
class TestActorGenerator(actorName:String) extends Actor { | |
val (actorEnumerator, actorChannel) = Concurrent.broadcast[String] | |
import play.api.Play.current | |
def receive = { | |
case Join() => { | |
sender ! Connected(actorEnumerator) | |
} | |
case SendMsg() => { | |
actorChannel.push(actorName + " sent message ") | |
} | |
} | |
} | |
class TestActorConsumer(name:String, actorRef:ActorRef) extends Actor { | |
import play.api.Play.current | |
implicit val timeout = Timeout(1 seconds) | |
val enum = (actorRef ? Join()) map { | |
case Connected(enumerator) => | |
println("Consumer hooked up") | |
enumerator |>> Iteratee.foreach[String] { str => println(name + " caught " + str)} | |
} | |
def apply() { | |
} | |
def receive = { | |
case Join() => {} | |
} | |
} | |
case class Add(actor:ActorRef) | |
class TestActorPanelConsumer(name:String, actorRefA:ActorRef, actorRefB:ActorRef) extends Actor { | |
import play.api.Play.current | |
implicit val timeout = Timeout(1 seconds) | |
var p:Concurrent.PatchPanel[String]=null | |
val outEnum = Concurrent.patchPanel[String]{patcher => | |
p = patcher | |
} | |
def apply() { | |
println (outEnum) | |
} | |
def receive = { | |
case Join() => { | |
sender ! Connected(outEnum) | |
} | |
case Add(actor) => { | |
println("Adding " + actor toString) | |
(actor ? Join()) map { | |
case Connected(enumerator) => { | |
p.patchIn(enumerator) | |
} | |
} | |
} | |
case Connected(enumerator) => { | |
p.patchIn(enumerator) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment