Skip to content

Instantly share code, notes, and snippets.

@sosuren
Created January 30, 2017 09:24
Show Gist options
  • Save sosuren/5556ab763689056d7f464856fe2beac7 to your computer and use it in GitHub Desktop.
Save sosuren/5556ab763689056d7f464856fe2beac7 to your computer and use it in GitHub Desktop.
Akka Streaming Sample to find distribution of Patients' Health based on Age and Gender
import akka.stream.stage.{InHandler, GraphStageLogic, GraphStage}
import sample.stream.AkkaStreamingSample.Gender.Gender
import scala.util.Random
object AkkaStreamingSample {
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
// implicit actor system
implicit val system = ActorSystem("Sample")
// implicit actor materializer
implicit val materializer = ActorMaterializer()
def main(args: Array[String]): Unit = {
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import Helper._
import GraphDSL.Implicits._
// source to create Patients' Record
val outletPatientRecord: Outlet[PatientRecord] = builder.add(Source(1 to 200) map (_ => PatientRecord(randomAge, randomGender, HealthIndex(randomIndex)))).out
// filters invalid record
val filter = Flow[PatientRecord].filter(p => isIndexInRange(p.index.value))
// broadcasts stream to calculate distribution by age and gender
val broadcaster = builder.add(Broadcast[PatientRecord](2))
// sink to calculate age distribution
val inletByAge: Inlet[PatientRecord] = builder.add(Sink.fromGraph(new DistributionByAge)).in
// sink to calculate gender distribution
val inletByGender: Inlet[PatientRecord] = builder.add(Sink.fromGraph(new DistributionByGender)).in
outletPatientRecord ~> filter ~> broadcaster
inletByAge <~ broadcaster
inletByGender <~ broadcaster
ClosedShape
})
g.run()
}
object Gender extends Enumeration {
type Gender = Value
val MALE, FEMALE = Value
}
object LifeStage extends Enumeration {
type LifeStage = Value
val CHILD, ADULT, OLD = Value
}
case class HealthIndex(value: Int)
case class PatientRecord(age: Int, gender: Gender, index: HealthIndex)
object Helper {
val marginIndex = 3
val random = new Random()
val maxAge = 88
val minIndex = 0
val maxIndex = 5
def lifeStage(age: Int) = age match {
case x if x <= 20 => LifeStage.CHILD
case x if x >= 21 && x <= 45 => LifeStage.ADULT
case x if x >= 46 => LifeStage.OLD
}
def isChild(age: Int):Boolean = age <= 20
def isAdult(age: Int):Boolean = age >= 21 && age <= 45
def isOldAge(age: Int): Boolean = age >= 46
def isIndexFailed(x: Int) = x > marginIndex
def isIndexInRange(idx: Int) = idx >= minIndex && idx <= maxIndex
def randomAge = random.nextInt(maxAge)
def randomGender:Gender = Gender(random.nextInt(Gender.maxId))
def randomIndex = random.nextInt(maxIndex * 2) // should be filter
}
class DistributionByAge extends GraphStage[SinkShape[PatientRecord]] {
val in: Inlet[PatientRecord] = Inlet("DistributionByAge")
override val shape: SinkShape[PatientRecord] = SinkShape.of(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var numChildFailed = 0
private var numChildTotal = 0
private var numAdultFailed = 0
private var numAdultTotal = 0
private var numOldAgeFailed = 0
private var numOldAgeTotal = 0
private var totalCount = 0
override def preStart(): Unit = pull(in)
setHandler(in, new InHandler {
override def onPush(): Unit = {
import Helper._
val elem = grab(in)
lifeStage(elem.age) match {
case LifeStage.CHILD =>
numChildTotal += 1
if (isIndexFailed(elem.index.value)) numChildFailed += 1
case LifeStage.ADULT =>
numAdultTotal += 1
if (isIndexFailed(elem.index.value)) numAdultFailed += 1
case LifeStage.OLD =>
numOldAgeTotal += 1
if (isIndexFailed(elem.index.value)) numOldAgeFailed += 1
}
totalCount += 1
pull(in)
}
override def onUpstreamFinish(): Unit = {
println(s"Distribution by Age [Total: $totalCount]:")
println(s"\tChild: $numChildFailed/$numChildTotal")
println(s"\tAdult: $numAdultFailed/$numAdultTotal")
println(s"\tOld: $numOldAgeFailed/$numOldAgeTotal")
completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
println(s"Upstream Failed: ${ex.getMessage}")
failStage(ex)
}
})
}
}
class DistributionByGender extends GraphStage[SinkShape[PatientRecord]] {
val in: Inlet[PatientRecord] = Inlet("DistributionByAge")
override val shape: SinkShape[PatientRecord] = SinkShape.of(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var numFemaleFailed = 0
private var numFemaleTotal = 0
private var numMaleFailed = 0
private var numMaleTotal = 0
private var totalCount = 0
override def preStart(): Unit = pull(in)
setHandler(in, new InHandler {
override def onPush(): Unit = {
import Helper._
grab(in) match {
case PatientRecord(_, Gender.FEMALE, idx) =>
numFemaleTotal += 1
if (isIndexFailed(idx.value)) numFemaleFailed += 1
case PatientRecord(_, Gender.MALE, idx) =>
numMaleTotal += 1
if (isIndexFailed(idx.value)) numMaleFailed += 1
case _ =>
}
totalCount += 1
pull(in)
}
override def onUpstreamFinish(): Unit = {
println(s"Distribution by Gender [Total: $totalCount]:")
println(s"\tFemale: $numFemaleFailed/$numFemaleTotal")
println(s"\tMale: $numMaleFailed/$numMaleTotal")
completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
println(s"Upstream Failed: ${ex.getMessage}")
failStage(ex)
}
})
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment