anonymous / Application.scala
Created

Embed URL

HTTPS clone URL

SSH clone URL

You can clone with HTTPS or SSH.

Download Gist

Using Enumerator interleave to combine multiple enumerators to an interee. This is a fixed version of; https://gist.github.com/doswell/5888270

View Application.scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
package controllers
 
import play.api._
import play.api.mvc._
import scala.concurrent.duration._
import akka.actor.{Actor, Props}
import play.api.libs.concurrent.Akka
import play.api.libs.concurrent.Execution.Implicits._
import controllers._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent._
import play.api.libs.iteratee._
 
import play.api.libs.concurrent._
 
 
object Application extends Controller {
import play.api.Play.current
implicit val timeout = Timeout(1 seconds)
lazy val actorA = Akka.system.actorOf(Props(new TestActorGenerator("A")),name="actorA")
lazy val actorB = Akka.system.actorOf(Props(new TestActorGenerator("B")),name="actorB")
lazy val started = {
val consumeA = Akka.system.actorOf(Props(new TestActorConsumer("A",actorA)),name="consumeA")
val consumeB = Akka.system.actorOf(Props(new TestActorConsumer("B",actorB)),name="consumeB")
val patchPanel = Akka.system.actorOf(Props(new TestActorPanelConsumer("C",actorA,actorB)),name="patchPanel")
val consumeC = Akka.system.actorOf(Props(new TestActorConsumer("C",patchPanel)),name="consumeC")
Akka.system.scheduler.schedule(0 seconds, 1 seconds, actorA, SendMsg())
Akka.system.scheduler.schedule(0 seconds, 1 seconds, actorB, SendMsg())
Akka.system.scheduler.schedule(0 seconds, 20 seconds, patchPanel, Add(actorA))
Akka.system.scheduler.schedule(10 seconds, 20 seconds, patchPanel, Add(actorB))
patchPanel ! Add(actorA)
patchPanel ! Add(actorB)
true
}
// val consume = Iteratee.consume[String]{case s => println(s)}
def index = Action {
import play.api.Play.current
started
Ok(views.html.index("Your new application is ready."))
}
}
View Application.scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
package controllers
 
import scala.concurrent.duration._
import scala.concurrent._
import akka.actor.{Actor, Props, ActorRef}
import play.api.libs.concurrent.Akka
import play.api.libs.concurrent.Execution.Implicits._
import play.api.libs.iteratee._
import play.api.libs.concurrent._
import akka.util.Timeout
import akka.pattern.ask
 
case class Join()
case class SendMsg()
case class Connected(
enumerator: Enumerator[String]
)
case class Ready()
 
object TestActor {
implicit val timeout = Timeout(1 second)
import play.api.Play.current
def join(actorName:String):Future[Enumerator[String]] = {
val joiningActor = Akka.system.actorFor("*/actor"+actorName)
(joiningActor ? Join()) map {
case Connected(enumerator) =>
println("connectede")
enumerator
}
}
 
}
 
class TestActorGenerator(actorName:String) extends Actor {
val (actorEnumerator, actorChannel) = Concurrent.broadcast[String]
import play.api.Play.current
 
def receive = {
case Join() => {
sender ! Connected(actorEnumerator)
}
case SendMsg() => {
actorChannel.push(actorName + " sent message ")
}
}
 
}
 
class TestActorConsumer(name:String, actorRef:ActorRef) extends Actor {
import play.api.Play.current
implicit val timeout = Timeout(1 seconds)
 
val enum = (actorRef ? Join()) map {
case Connected(enumerator) =>
println("Consumer hooked up")
enumerator |>> Iteratee.foreach[String] { str => println(name + " caught " + str)}
}
 
 
def apply() {
}
 
def receive = {
case Join() => {}
}
}
case class Add(actor:ActorRef)
class TestActorPanelConsumer(name:String, actorRefA:ActorRef, actorRefB:ActorRef) extends Actor {
import play.api.Play.current
import scala.collection.mutable.Seq
implicit val timeout = Timeout(1 seconds)
var p:Concurrent.PatchPanel[String]=null
val outEnum = Concurrent.patchPanel[String]{patcher =>
p = patcher
}
var enums:Seq[Enumerator[String]] = Seq()
 
def apply() {
println (outEnum)
}
 
def receive = {
case Join() => {
sender ! Connected(outEnum)
}
case Add(actor) => {
println("Adding " + actor toString)
(actor ? Join()) map {
case Connected(enumerator) => {
enums = enums :+ enumerator
p.patchIn(Enumerator.interleave[String](enums))
}
}
}
case Connected(enumerator) => {
enums = enums :+ enumerator
//Combines the enumerators together, then swaps it in to the patch panel
p.patchIn(Enumerator.interleave[String](enums))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.