Skip to content

Instantly share code, notes, and snippets.

@doswell
Created June 28, 2013 21:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save doswell/5888270 to your computer and use it in GitHub Desktop.
Save doswell/5888270 to your computer and use it in GitHub Desktop.
Concurrent.patchPanel only forwards data from last added Enumerator
package controllers
import play.api._
import play.api.mvc._
import scala.concurrent.duration._
import akka.actor.{Actor, Props}
import play.api.libs.concurrent.Akka
import play.api.libs.concurrent.Execution.Implicits._
import controllers._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent._
import play.api.libs.iteratee._
import play.api.libs.concurrent._
object Application extends Controller {
import play.api.Play.current
implicit val timeout = Timeout(1 seconds)
lazy val actorA = Akka.system.actorOf(Props(new TestActorGenerator("A")),name="actorA")
lazy val actorB = Akka.system.actorOf(Props(new TestActorGenerator("B")),name="actorB")
lazy val started = {
val consumeA = Akka.system.actorOf(Props(new TestActorConsumer("A",actorA)),name="consumeA")
val consumeB = Akka.system.actorOf(Props(new TestActorConsumer("B",actorB)),name="consumeB")
val patchPanel = Akka.system.actorOf(Props(new TestActorPanelConsumer("C",actorA,actorB)),name="patchPanel")
val consumeC = Akka.system.actorOf(Props(new TestActorConsumer("C",patchPanel)),name="consumeC")
Akka.system.scheduler.schedule(0 seconds, 1 seconds, actorA, SendMsg())
Akka.system.scheduler.schedule(0 seconds, 1 seconds, actorB, SendMsg())
Akka.system.scheduler.schedule(0 seconds, 20 seconds, patchPanel, Add(actorA))
Akka.system.scheduler.schedule(10 seconds, 20 seconds, patchPanel, Add(actorB))
patchPanel ! Add(actorA)
patchPanel ! Add(actorB)
true
}
// val consume = Iteratee.consume[String]{case s => println(s)}
def index = Action {
import play.api.Play.current
started
Ok(views.html.index("Your new application is ready."))
}
}
package controllers
import scala.concurrent.duration._
import scala.concurrent._
import akka.actor.{Actor, Props, ActorRef}
import play.api.libs.concurrent.Akka
import play.api.libs.concurrent.Execution.Implicits._
import play.api.libs.iteratee._
import play.api.libs.concurrent._
import akka.util.Timeout
import akka.pattern.ask
case class Join()
case class SendMsg()
case class Connected(
enumerator: Enumerator[String]
)
case class Ready()
object TestActor {
implicit val timeout = Timeout(1 second)
import play.api.Play.current
def join(actorName:String):Future[Enumerator[String]] = {
val joiningActor = Akka.system.actorFor("*/actor"+actorName)
(joiningActor ? Join()) map {
case Connected(enumerator) =>
println("connectede")
enumerator
}
}
}
class TestActorGenerator(actorName:String) extends Actor {
val (actorEnumerator, actorChannel) = Concurrent.broadcast[String]
import play.api.Play.current
def receive = {
case Join() => {
sender ! Connected(actorEnumerator)
}
case SendMsg() => {
actorChannel.push(actorName + " sent message ")
}
}
}
class TestActorConsumer(name:String, actorRef:ActorRef) extends Actor {
import play.api.Play.current
implicit val timeout = Timeout(1 seconds)
val enum = (actorRef ? Join()) map {
case Connected(enumerator) =>
println("Consumer hooked up")
enumerator |>> Iteratee.foreach[String] { str => println(name + " caught " + str)}
}
def apply() {
}
def receive = {
case Join() => {}
}
}
case class Add(actor:ActorRef)
class TestActorPanelConsumer(name:String, actorRefA:ActorRef, actorRefB:ActorRef) extends Actor {
import play.api.Play.current
implicit val timeout = Timeout(1 seconds)
var p:Concurrent.PatchPanel[String]=null
val outEnum = Concurrent.patchPanel[String]{patcher =>
p = patcher
}
def apply() {
println (outEnum)
}
def receive = {
case Join() => {
sender ! Connected(outEnum)
}
case Add(actor) => {
println("Adding " + actor toString)
(actor ? Join()) map {
case Connected(enumerator) => {
p.patchIn(enumerator)
}
}
}
case Connected(enumerator) => {
p.patchIn(enumerator)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment