Created
August 30, 2018 09:25
-
-
Save htimur/be9a6c22a286c13cd104099c3129555d to your computer and use it in GitHub Desktop.
Example of extracting KinesisConsumer during Kinesis source creation for the WW-Digital/reactive-kinesis library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.{ActorRef, ActorSystem} | |
import akka.stream.scaladsl.Sink | |
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer | |
import com.weightwatchers.reactive.kinesis.stream._ | |
object Main extends App { | |
val sys = ActorSystem("kinesis-consumer-system") | |
var consumer = Option.empty[KinesisConsumer] | |
Kinesis | |
.source( | |
"consumer-name", | |
(conf: KinesisConsumer.ConsumerConf, eventProcessor: ActorRef) => { | |
val c = KinesisConsumer(conf, eventProcessor, sys) | |
consumer = Option(c) | |
c | |
} | |
) | |
.take(100) | |
.map(event => event.map(_.payloadAsString())) // read and map payload as string | |
.mapAsync(10)(event => event.mapAsync(Downloader.download(event.payload))) // handle an async message | |
.map(event => event.commit()) // mark the event as handled by calling commit | |
.runWith(Sink.seq) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment