Last active
December 15, 2015 00:28
-
-
Save kcwill2/5172699 to your computer and use it in GitHub Desktop.
Akka EventBus using SubchannelClassification
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package event | |
import akka.event.ActorEventBus | |
import akka.event.SubchannelClassification | |
/** | |
* classifiers for the sim event dispatch | |
*/ | |
class SimEventKind() | |
/** | |
* events for the sim event dispatch | |
* @param kind | |
*/ | |
case class SimEvent(val kind: Class[_ <: SimEventKind]) | |
/** | |
* this is the event dispatch module. it will route all events published to it to all subscribers | |
* it uses subchannel classification to implement sub-channel subscription (i.e sub-tree subscription) | |
*/ | |
class EventDispatch extends ActorEventBus with SubchannelClassification { | |
import akka.util.Subclassification | |
import framework.{SimEventKind, SimEvent} | |
type Event = SimEvent | |
type Classifier = Class[_ <: SimEventKind]//classifier is any subclass of SimEventKind | |
protected implicit val subclassification = new Subclassification[Classifier] { | |
def isEqual(x: Classifier, y: Classifier) = x == y | |
def isSubclass(x: Classifier, y: Classifier) = y isAssignableFrom x | |
} | |
def publish(event: Event, subscriber: Subscriber){ | |
subscriber ! event | |
} | |
def classify(event: Event):Classifier = { | |
event.kind | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package tests | |
import org.scalatest.{BeforeAndAfter, FunSpec} | |
import org.scalatest.matchers.ShouldMatchers | |
/** | |
* tests | |
*/ | |
class EventBusTest extends FunSpec with ShouldMatchers with BeforeAndAfter{ | |
describe("basic event"){ | |
import akka.actor.ActorSystem | |
import event.EventDispatch | |
import java.util.concurrent.TimeoutException | |
class LifeCycleEvent() extends SimEventKind | |
class Created() extends LifeCycleEvent | |
class Destroyed() extends LifeCycleEvent | |
implicit val system = ActorSystem("test") | |
val eventDispatch = new EventDispatch | |
it("should be received by the subscribers"){ | |
import akka.actor.ActorDSL._ | |
import framework.SimEvent | |
val subscriber1 = inbox() | |
val subscriber2 = inbox() | |
val subscriber3 = inbox() | |
eventDispatch.subscribe(subscriber1.receiver, classOf[Created])// should only get Created | |
eventDispatch.subscribe(subscriber2.receiver, classOf[SimEventKind])//should get all subclasses of SimEventKind | |
eventDispatch.subscribe(subscriber3.receiver, classOf[Destroyed])//should only get Destroyed | |
eventDispatch.publish(new SimEvent(classOf[Created])) | |
//make sure subscriber1 gets the created | |
try { | |
subscriber1.select() { | |
case s: SimEvent => { assert(s.kind === classOf[Created]) } | |
case _ => println("unknown message"); assert(false) | |
} | |
} catch { case e: TimeoutException => assert(false) } | |
//make sure subscriber2 gets the created | |
try { | |
subscriber2.select() { | |
case s: SimEvent => { assert(s.kind === classOf[Created]) } | |
case _ => println("unknown message"); assert(false) | |
} | |
} catch { case e: TimeoutException => assert(false) } | |
//make sure subscriber3 does not get the created | |
try { | |
subscriber3.select() { case _ => assert(false) } | |
} catch { case e: TimeoutException => assert(true) } | |
//publish a Destroyed | |
eventDispatch.publish(new SimEvent(classOf[Destroyed])) | |
//make sure subscriber2 does not get the Destroyed | |
try { | |
subscriber1.select() { case _ => assert(false) } | |
} catch { case e: TimeoutException => assert(true) } | |
//make sure subscriber2 gets the Destroyed | |
try { | |
subscriber2.select() { | |
case s: SimEvent => { assert(s.kind === classOf[Destroyed]) } | |
case _ => println("unknown message"); assert(false) | |
} | |
} catch { case e: TimeoutException => assert(false) } | |
//make sure subscriber3 gets the Destroyed | |
try { | |
subscriber3.select() { | |
case s: SimEvent => { assert(s.kind === classOf[Destroyed]) } | |
case _ => println("unknown message"); assert(false) | |
} | |
} catch { case e: TimeoutException => assert(false) } | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I took me a while to sort out how to use SubchannelClassification. This worked, all of these tests pass. I'm sure I made some mistakes. Feel free to point them out...
I hope this helps someone else.