public
anonymous / Application.scala
Created

Using Enumerator interleave to combine multiple enumerators to an interee. This is a fixed version of; https://gist.github.com/doswell/5888270

  • Download Gist
Application.scala
Scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
package controllers
 
import play.api._
import play.api.mvc._
import scala.concurrent.duration._
import akka.actor.{Actor, Props}
import play.api.libs.concurrent.Akka
import play.api.libs.concurrent.Execution.Implicits._
import controllers._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent._
import play.api.libs.iteratee._
 
import play.api.libs.concurrent._
 
 
object Application extends Controller {
import play.api.Play.current
implicit val timeout = Timeout(1 seconds)
lazy val actorA = Akka.system.actorOf(Props(new TestActorGenerator("A")),name="actorA")
lazy val actorB = Akka.system.actorOf(Props(new TestActorGenerator("B")),name="actorB")
lazy val started = {
val consumeA = Akka.system.actorOf(Props(new TestActorConsumer("A",actorA)),name="consumeA")
val consumeB = Akka.system.actorOf(Props(new TestActorConsumer("B",actorB)),name="consumeB")
val patchPanel = Akka.system.actorOf(Props(new TestActorPanelConsumer("C",actorA,actorB)),name="patchPanel")
val consumeC = Akka.system.actorOf(Props(new TestActorConsumer("C",patchPanel)),name="consumeC")
Akka.system.scheduler.schedule(0 seconds, 1 seconds, actorA, SendMsg())
Akka.system.scheduler.schedule(0 seconds, 1 seconds, actorB, SendMsg())
Akka.system.scheduler.schedule(0 seconds, 20 seconds, patchPanel, Add(actorA))
Akka.system.scheduler.schedule(10 seconds, 20 seconds, patchPanel, Add(actorB))
patchPanel ! Add(actorA)
patchPanel ! Add(actorB)
true
}
// val consume = Iteratee.consume[String]{case s => println(s)}
def index = Action {
import play.api.Play.current
started
Ok(views.html.index("Your new application is ready."))
}
}
TestActor.scala
Scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
package controllers
 
import scala.concurrent.duration._
import scala.concurrent._
import akka.actor.{Actor, Props, ActorRef}
import play.api.libs.concurrent.Akka
import play.api.libs.concurrent.Execution.Implicits._
import play.api.libs.iteratee._
import play.api.libs.concurrent._
import akka.util.Timeout
import akka.pattern.ask
 
case class Join()
case class SendMsg()
case class Connected(
enumerator: Enumerator[String]
)
case class Ready()
 
object TestActor {
implicit val timeout = Timeout(1 second)
import play.api.Play.current
def join(actorName:String):Future[Enumerator[String]] = {
val joiningActor = Akka.system.actorFor("*/actor"+actorName)
(joiningActor ? Join()) map {
case Connected(enumerator) =>
println("connectede")
enumerator
}
}
 
}
 
class TestActorGenerator(actorName:String) extends Actor {
val (actorEnumerator, actorChannel) = Concurrent.broadcast[String]
import play.api.Play.current
 
def receive = {
case Join() => {
sender ! Connected(actorEnumerator)
}
case SendMsg() => {
actorChannel.push(actorName + " sent message ")
}
}
 
}
 
class TestActorConsumer(name:String, actorRef:ActorRef) extends Actor {
import play.api.Play.current
implicit val timeout = Timeout(1 seconds)
 
val enum = (actorRef ? Join()) map {
case Connected(enumerator) =>
println("Consumer hooked up")
enumerator |>> Iteratee.foreach[String] { str => println(name + " caught " + str)}
}
 
 
def apply() {
}
 
def receive = {
case Join() => {}
}
}
case class Add(actor:ActorRef)
class TestActorPanelConsumer(name:String, actorRefA:ActorRef, actorRefB:ActorRef) extends Actor {
import play.api.Play.current
import scala.collection.mutable.Seq
implicit val timeout = Timeout(1 seconds)
var p:Concurrent.PatchPanel[String]=null
val outEnum = Concurrent.patchPanel[String]{patcher =>
p = patcher
}
var enums:Seq[Enumerator[String]] = Seq()
 
def apply() {
println (outEnum)
}
 
def receive = {
case Join() => {
sender ! Connected(outEnum)
}
case Add(actor) => {
println("Adding " + actor toString)
(actor ? Join()) map {
case Connected(enumerator) => {
enums = enums :+ enumerator
p.patchIn(Enumerator.interleave[String](enums))
}
}
}
case Connected(enumerator) => {
enums = enums :+ enumerator
//Combines the enumerators together, then swaps it in to the patch panel
p.patchIn(Enumerator.interleave[String](enums))
}
}
}

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.