Skip to content

Instantly share code, notes, and snippets.

View jeroenr's full-sized avatar

Jeroen Rosenberg jeroenr

View GitHub Profile
package com.github.jeroenr.rain.radar
import cloudflow.akkastream._
import cloudflow.akkastream.util.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._
class RainClutterPartitioner extends AkkaStreamlet {
val in = AvroInlet[PrecipitationData]("in")
val clutter = AvroOutlet[Clutter]("clutter").withPartitioner(RoundRobinPartitioner)
package com.github.jeroenr.rain.radar
import java.time.Instant
import spray.json._
trait InstantJsonSupport extends DefaultJsonProtocol {
implicit object InstantFormat extends JsonFormat[Instant] {
def write(instant: Instant) = JsNumber(instant.toEpochMilli)
package com.github.jeroenr.rain.radar
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import cloudflow.akkastream._
import cloudflow.akkastream.util.scaladsl._
import cloudflow.streamlets._
import cloudflow.streamlets.avro._
import com.github.jeroenr.rain.radar.PrecipitationDataJsonSupport._
class PrecipitationDataHttpIngress extends AkkaServerStreamlet {
{
"namespace": "com.github.jeroenr.rain.radar",
"type": "record",
"name": "Clutter",
"fields":[
{
"name": "timestamp",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
{
"namespace": "com.github.jeroenr.rain.radar",
"type": "record",
"name": "Rain",
"fields":[
{
"name": "timestamp",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
{
"namespace": "com.github.jeroenr.rain.radar",
"type": "record",
"name": "Location",
"fields":[
{
"name": "lat", "type": "double"
},
{
"name": "lng", "type": "double"
{
"namespace": "com.github.jeroenr.rain.radar",
"type": "record",
"name": "PrecipitationData",
"fields":[
{
"name": "timestamp",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
resolvers += Resolver.url("lightbend-oss", url("https://lightbend.bintray.com/cloudflow"))(Resolver.ivyStylePatterns)
resolvers += "Akka Snapshots" at "https://repo.akka.io/snapshots/"
addSbtPlugin("com.lightbend.cloudflow" % "sbt-cloudflow" % "1.3.0-M1")
interface Service {
Flux<String> observe();
Mono<Void> save(String s);
}
class Foo {
private final Service service;
void longRunningProblem() {
service.observe()
.flatMap(service::save, 10)
interface Service {
Flux<String> observe();
Mono<Void> save(String s);
}
class Foo {
private final Service service;
void longRunningProblem() {
service.observe()
.flatMap(service::save, 10)