Install the following tools
- git
- docker & docker-compose
- Google Cloud CLI
- terraform
- node 16 & yarn
- VSCode & terraform plugin
AWSTemplateFormatVersion: '2010-09-09' | |
Transform: AWS::Serverless-2016-10-31 | |
Description: AWS Kinesis Data Stream to AWS Lambda with Enhanced Fanout via a consumer | |
Resources: | |
# Define an AWS Kinesis Data Stream | |
KinesisStream: | |
Type: "AWS::Kinesis::Stream" | |
Properties: | |
ShardCount: 80 |
Install the following tools
val shardOplogSources: Seq[Source[Document, NotUsed]] = shardService.getShards.map { shard => | |
RestartSource.withBackoff( | |
minBackoff = 3.seconds, | |
maxBackoff = 30.seconds, | |
randomFactor = 0.2, // adds 20% "noise" to vary the intervals slightly | |
maxRestarts = 20 // limits the amount of restarts to 20 | |
) { () => | |
oplog.source(MongoClient(shard.uri)) | |
} | |
} |
package htimur.services | |
import akka.NotUsed | |
import akka.stream.alpakka.mongodb.scaladsl.MongoSource | |
import akka.stream.scaladsl.Source | |
import com.mongodb.CursorType | |
import htimur.configs.MongoConstants | |
import htimur.models.OplogOperation | |
import org.mongodb.scala.bson.collection.immutable.Document | |
import org.mongodb.scala.model.Filters._ |
name := "MongoDB Replica Set oplog tailing with Akks Streams" | |
version := "1.0" | |
scalaVersion := "2.12.7" | |
libraryDependencies ++= Seq( | |
"ch.qos.logback" % "logback-classic" % "1.2.3", | |
"org.mongodb.scala" %% "mongo-scala-driver" % "2.4.2", | |
"com.typesafe.akka" %% "akka-stream" % "2.5.17" |
import akka.actor.{ActorRef, ActorSystem} | |
import akka.stream.scaladsl.Sink | |
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer | |
import com.weightwatchers.reactive.kinesis.stream._ | |
object Main extends App { | |
val sys = ActorSystem("kinesis-consumer-system") | |
var consumer = Option.empty[KinesisConsumer] |
GET /admin/metrics com.kenshoo.play.metrics.MetricsController.metrics |
-- Counters -------------------------------------------------------------------- | |
controllers.HomeController.index.current | |
count = 1 | |
-- Meters ---------------------------------------------------------------------- | |
controllers.HomeController.index.meter | |
count = 1 | |
mean rate = 0.25 events/second | |
1-minute rate = 0.00 events/second | |
5-minute rate = 0.00 events/second |
@Singleton | |
class HomeController @Inject() extends Controller { | |
@Counted(monotonic = true) | |
@Timed | |
@Metered | |
def index = Action { | |
Ok(views.html.index("Your new application is ready.")) | |
} |
metrics { | |
jvm = false | |
logback = false | |
reporters = [ | |
{ | |
type: "console" | |
frequency: "10 seconds" | |
} | |
] |