Skip to content

Instantly share code, notes, and snippets.

View RayRoestenburg's full-sized avatar

Raymond Roestenburg RayRoestenburg

View GitHub Profile
/**
* A trait to wire tap every message that is processed by an Actor
* A listener is sent every message that the actor received, after
* the actor has processed it.
*/
trait WireTap extends Actor {
def listener: ActorRef
abstract override def receive = {
case m =>
@RayRoestenburg
RayRoestenburg / gist:3518897
Created August 29, 2012 21:00
Timingdiagnostics
case class TimeDataRequest(id:Long)
/**
* A Diagnostics that records timing based on the Id of the message
*/
trait TimingDiagnostics extends Diagnostics[(Long,Long),TimeDataRequest] {
private var map = Map[Long, Long]()
var timeBefore:Long = 0
def diagnoseBefore:Receive = {
@RayRoestenburg
RayRoestenburg / gist:3518821
Created August 29, 2012 20:55
diagnostics
trait Diagnostics[Data, Request] extends WebNode[Data, Request] {
override def sendSpiders(spiderHome: ActorRef, data: Data, msg: (Request, Spider), collected: Set[ActorRef]) {
spiderHome ! DiagnosticData[Data](data, now, selfNode)
super.sendSpiders(spiderHome, data, msg, collected)
}
override def before = diagnoseBefore
override def after = diagnoseAfter
@RayRoestenburg
RayRoestenburg / gist:3518699
Created August 29, 2012 20:45
Spider, WebTrail, WebNodeRef
/**
* A spider which has a home, and leaves a trail on the web
*/
case class Spider(home:ActorRef, trail:WebTrail= WebTrail())
/**
* A trail on the web.
*/
case class WebTrail(collected:Set[ActorRef]= Set(), uuid:UUID = UUID.randomUUID())
@RayRoestenburg
RayRoestenburg / gist:3518677
Created August 29, 2012 20:43
handlerequest
def handleRequest:Receive = {
case (req:Request, spider @ Spider(ref,WebTrail(collected, uuid))) if !lastId.exists(_ == uuid) =>
lastId = Some(uuid)
collect(req).map { data =>
sendSpiders(ref, data, (req,spider), collected)
}
}
@RayRoestenburg
RayRoestenburg / gist:3518521
Created August 29, 2012 20:32
wrappedreceive
def wrappedReceive:Receive = {
case m:Any if ! m.isInstanceOf[(Request,Spider)] =>
recordInput(sender)
before(m)
super.receive(m)
after(m)
}
trait WebNode[Data,Request] extends Actor with Node {
// pathways coming into the node
protected val in = mutable.Set[ActorRef]()
// pathways going out of the node
protected val out = mutable.Set[ActorRef]()
// used to only handle a request once that travels
// through the web
protected var lastId:Option[UUID] = None
@RayRoestenburg
RayRoestenburg / gist:3518294
Created August 29, 2012 20:17
A node trait
trait Node { actor:Actor =>
def send(actorRef:ActorRef, m:Any) { actorRef.tell(m) }
def reply(m:Any) { sender ! m }
def forward(actorRef:ActorRef, m:Any) { actorRef.forward(m) }
def actorOf(props:Props):ActorRef = actor.context.actorOf(props)
def actorFor(actorPath:ActorPath):ActorRef = actor.context.actorFor(actorPath)
}
@RayRoestenburg
RayRoestenburg / testReplyAfterProcessing.scala
Created September 1, 2011 18:10
Test example using ReplyAfterProcessing
class SomeSpec extends WordSpec {
"A oneway actor" should {
"Tell me when it's finished "in {
val testActor = Actor.actorOf(new SomeActor() with ReplyAfterProcessing)
var reply = testActor !! (message, 1000)
if(reply.isEmpty) fail("some message")
}
}
}
@RayRoestenburg
RayRoestenburg / ReplyAfterProcessing.scala
Created September 1, 2011 18:02
Stackable trait for one-way actor testing
import akka.actor.Actor
trait ReplyAfterProcessing extends Actor {
abstract override def receive = {
super.receive andThen {
case msg => self.reply_?(msg)
}
}
}