Skip to content

Instantly share code, notes, and snippets.

@jeroenr
Last active December 6, 2019 15:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jeroenr/8b9fd2162e6277bd01012858891bd6fa to your computer and use it in GitHub Desktop.
Save jeroenr/8b9fd2162e6277bd01012858891bd6fa to your computer and use it in GitHub Desktop.
package com.github.jeroenr.rain.radar
import cloudflow.akkastream._
import cloudflow.akkastream.util.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._
class RainClutterPartitioner extends AkkaStreamlet {
val in = AvroInlet[PrecipitationData]("in")
val clutter = AvroOutlet[Clutter]("clutter").withPartitioner(RoundRobinPartitioner)
val rain = AvroOutlet[Rain]("rain").withPartitioner(_.city)
val shape = StreamletShape(in).withOutlets(clutter, rain)
override def createLogic = new SplitterLogic(in, clutter, rain) {
def flow = flowWithOffsetContext()
.filter(_.value > 0) // disregard data if it's dry
.map { precipitationData ⇒
if(precipitationData.value <= 0.1) Left(Clutter(precipitationData.timestamp, precipitationData.value))
else Right(Rain(precipitationData.timestamp, precipitationData.location.city, precipitationData.value))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment