Skip to content

Instantly share code, notes, and snippets.

@agaro1121
Forked from htimur/Main.scala
Created August 30, 2018 09:30
Show Gist options
  • Save agaro1121/d860ec7f6813c19992fc8df8999d16bb to your computer and use it in GitHub Desktop.
Save agaro1121/d860ec7f6813c19992fc8df8999d16bb to your computer and use it in GitHub Desktop.
Example of extracting KinesisConsumer during Kinesis source creation for the WW-Digital/reactive-kinesis library
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.scaladsl.Sink
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer
import com.weightwatchers.reactive.kinesis.stream._
object Main extends App {
val sys = ActorSystem("kinesis-consumer-system")
var consumer = Option.empty[KinesisConsumer]
Kinesis
.source(
"consumer-name",
(conf: KinesisConsumer.ConsumerConf, eventProcessor: ActorRef) => {
val c = KinesisConsumer(conf, eventProcessor, sys)
consumer = Option(c)
c
}
)
.take(100)
.map(event => event.map(_.payloadAsString())) // read and map payload as string
.mapAsync(10)(event => event.mapAsync(Downloader.download(event.payload))) // handle an async message
.map(event => event.commit()) // mark the event as handled by calling commit
.runWith(Sink.seq)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment