Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save prayagupa/a971c48d6c6ec49660ea to your computer and use it in GitHub Desktop.
Save prayagupa/a971c48d6c6ec49660ea to your computer and use it in GitHub Desktop.
An example on how to use an EventStream in Akka. See article http://danielasfregola.com/2015/04/20/peer-to-many-communication-in-akka/
package com.eventstream
import java.util.Date
/**
* Created by prayagupd
* on 2/6/16.
*/
trait Event
trait MessageLog
case class PackageArrivedEvent(packageId: String, items: List[String]) extends Event
case class PackageShipped(shippedDate: Date) extends MessageLog
import akka.actor._
import com.eventstream.PackageArrivedEvent
import com.eventstream.publisher.PackageArrivedPublisher
import com.eventstream.subscriber.PackageArrivedSubscriber
object EventStreamApplication extends App {
implicit val system = ActorSystem("event-stream")
val packageId = "package001"
val packagePublisher = system.actorOf(Props[PackageArrivedPublisher], name = "package-publisher")
val subscriber1 = system.actorOf(Props[PackageArrivedSubscriber], name = "subscriber-1")
val subscriber2 = system.actorOf(Props[PackageArrivedSubscriber], name = "subscriber-2")
packagePublisher ! PackageArrivedEvent(packageId = "package001", items = List("shirts", "pants"))
system.eventStream.unsubscribe(subscriber2, classOf[PackageArrivedEvent])
packagePublisher ! PackageArrivedEvent(packageId = "package002", items = List("underwear"))
}
package com.eventstream.publisher
import akka.actor._
import com.eventstream.PackageArrivedEvent
/**
* Created by prayagupd
* on 2/7/16.
*/
class PackageArrivedPublisher extends Actor {
def receive = {
case packageToShip: PackageArrivedEvent => {
println(s"Publishing a new package - $packageToShip")
context.system.eventStream.publish(packageToShip)
}
}
}
package com.eventstream.subscriber
import akka.actor.Actor
import com.eventstream.PackageArrivedEvent
/**
* Created by prayagupd
* on 2/7/16.
*/
class PackageArrivedSubscriber extends Actor {
override def preStart =
context.system.eventStream.subscribe(self, classOf[PackageArrivedEvent])
def receive = {
case packageToShip: PackageArrivedEvent =>
println(s"${self.path.name} - receiving a new package: $packageToShip")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment