Skip to content

Instantly share code, notes, and snippets.

Avatar

Jeroen Rosenberg jeroenr

View GitHub Profile
View build.sbt
import sbt._
import sbt.Keys._
lazy val rainRadar = (project in file("."))
.enablePlugins(CloudflowAkkaStreamsApplicationPlugin)
.settings(
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http-spray-json" % "10.1.10",
"ch.qos.logback" % "logback-classic" % "1.2.3"
),
View blueprint.conf
blueprint {
streamlets {
http-ingress = com.github.jeroenr.rain.radar.PrecipitationDataHttpIngress
partitioner = com.github.jeroenr.rain.radar.RainClutterPartitioner
rain-logger = com.github.jeroenr.rain.radar.RainLogger
clutter-logger = com.github.jeroenr.rain.radar.ClutterLogger
}
connections {
http-ingress.out = [partitioner.in]
View RainLogger.scala
package com.github.jeroenr.rain.radar
import akka.event.Logging
import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._
import org.apache.avro.specific.SpecificRecordBase
import scala.reflect.ClassTag
View RainClutterPartitioner.scala
package com.github.jeroenr.rain.radar
import cloudflow.akkastream._
import cloudflow.akkastream.util.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._
class RainClutterPartitioner extends AkkaStreamlet {
val in = AvroInlet[PrecipitationData]("in")
val clutter = AvroOutlet[Clutter]("clutter").withPartitioner(RoundRobinPartitioner)
View JsonFormats.scala
package com.github.jeroenr.rain.radar
import java.time.Instant
import spray.json._
trait InstantJsonSupport extends DefaultJsonProtocol {
implicit object InstantFormat extends JsonFormat[Instant] {
def write(instant: Instant) = JsNumber(instant.toEpochMilli)
View PrecipitationDataHttpIngress.scala
package com.github.jeroenr.rain.radar
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import cloudflow.akkastream._
import cloudflow.akkastream.util.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._
import com.github.jeroenr.rain.radar.PrecipitationDataJsonSupport._
class PrecipitationDataHttpIngress extends AkkaServerStreamlet {
View Clutter.avsc
{
"namespace": "com.github.jeroenr.rain.radar",
"type": "record",
"name": "Clutter",
"fields":[
{
"name": "timestamp",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
View Rain.avsc
{
"namespace": "com.github.jeroenr.rain.radar",
"type": "record",
"name": "Rain",
"fields":[
{
"name": "timestamp",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
View Location.avsc
{
"namespace": "com.github.jeroenr.rain.radar",
"type": "record",
"name": "Location",
"fields":[
{
"name": "lat", "type": "double"
},
{
"name": "lng", "type": "double"
View PrecipitationData.avsc
{
"namespace": "com.github.jeroenr.rain.radar",
"type": "record",
"name": "PrecipitationData",
"fields":[
{
"name": "timestamp",
"type": {
"type": "long",
"logicalType": "timestamp-millis"