Skip to content

Instantly share code, notes, and snippets.

@eduardorost
eduardorost / merge-schemas.scala
Last active December 19, 2023 09:36
Merge Schema with structs
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.slf4j.{Logger, LoggerFactory}
object Main {
val logger: Logger = LoggerFactory.getLogger(this.getClass)
private lazy val sparkConf: SparkConf = new SparkConf()
.setMaster("local[*]")
name := "streaming_mongodb"
version := "0.1"
scalaVersion := "2.12.9"
libraryDependencies ++= Seq(
"org.mongodb.scala" %% "mongo-scala-driver" % "2.7.0",
"org.apache.spark" %% "spark-streaming" % "2.4.3",
"org.apache.spark" %% "spark-sql" % "2.4.3",
"org.apache.hadoop" % "hadoop-aws" % "3.2.0",
"org.apache.hadoop" % "hadoop-common" % "3.2.0",
val sparkConf: SparkConf = new SparkConf()
.setMaster("local[*]")
.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")
.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")
val spark: SparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
def process(changeDocument: ChangeStreamDocument[Document]) = {
implicit val formats: DefaultFormats.type = DefaultFormats
val rdd = spark.sparkContext.makeRDD(List(write(mapRecord(changeDocument))))
if (rddQueue.isEmpty)
rddQueue.enqueue(rdd)
else
rddQueue.enqueue(rddQueue.dequeue().union(rdd))
}
def sendS3(rdd: RDD[String]) = {
spark.read.json(rdd)
.coalesce(1)
.write
.mode("append")
.option("compression", "snappy")
.parquet("s3://bucket/folder")
}
val ssc = new StreamingContext(sparkConf, Seconds(60))
val rddQueue: collection.mutable.Queue[RDD[String]] = collection.mutable.Queue()
ssc.queueStream(rddQueue)
.foreachRDD(rdd => {
if (!rdd.isEmpty()) sendS3(rdd)
})
ssc.start()
val client = MongoClient.apply("mongodb.uri")
val database = client.getDatabase("mongodb.database")
val collection = database.getCollection("mongodb.collection")
collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
.subscribe(new Observer[ChangeStreamDocument[Document]] {
override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = process(changeDocument)
override def onError(e: Throwable): Unit = println(s"Error: $e")
override def onComplete(): Unit = println("Completed")
@eduardorost
eduardorost / apache.conf
Last active November 26, 2018 22:00
docker-compose and configuration for elastic stack
input
{
beats {
port => 5044
}
}
filter
{