Skip to content

Instantly share code, notes, and snippets.

@jeroenr
Last active December 9, 2019 15:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jeroenr/3b3d5b9507c0526e3b206b339c345120 to your computer and use it in GitHub Desktop.
Save jeroenr/3b3d5b9507c0526e3b206b339c345120 to your computer and use it in GitHub Desktop.
package com.github.jeroenr.rain.radar
import akka.event.Logging
import cloudflow.akkastream._
import cloudflow.akkastream.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._
import org.apache.avro.specific.SpecificRecordBase
import scala.reflect.ClassTag
abstract class LoggerStreamlet[T <: SpecificRecordBase : ClassTag](template: String,
logLevel: Logging.LogLevel = Logging.InfoLevel) extends AkkaStreamlet {
val inlet = AvroInlet[T](name = "in")
val shape = StreamletShape.withInlets(inlet)
override def createLogic = new RunnableGraphStreamletLogic() {
def flow = {
FlowWithOffsetContext[T]
.map { element ⇒
system.log.log(logLevel, template, element)
element
}
}
def runnableGraph = {
sourceWithOffsetContext(inlet).via(flow).to(sinkWithOffsetContext)
}
}
}
class RainLogger extends LoggerStreamlet[Rain]("Rain detected: {}")
class ClutterLogger extends LoggerStreamlet[Clutter]("Clutter detected: {}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment