Created
August 18, 2022 02:39
-
-
Save alexeykudinkin/d0d471685456cf3a1d698675928f2777 to your computer and use it in GitHub Desktop.
Ingesting from Apache Pulsar to Apache Hudi
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.HoodieUnsafeUtils.createDataFrameFromRows | |
import org.apache.spark.sql.{DataFrame, SparkSession, functions} | |
val spark: SparkSession | |
val adminURL = "http://localhost:8080" | |
val endpointURL = "pulsar://localhost:6650" | |
val topicName = "stonks_test" | |
//////////////////////////////////////////////////////////////////////////////////// | |
// Step 2: Preparing the Dataset | |
//////////////////////////////////////////////////////////////////////////////////// | |
val df = spark.read.format("json"). | |
load("/Users/alexey.kudinkin/code/github/apache/pulsar/.bin/2.10.1/batch.json") | |
val batch = | |
df.withColumn("__topic", functions.lit(topicName)). | |
limit(1000). | |
collect() | |
//////////////////////////////////////////////////////////////////////////////////// | |
// Step 3: Writing into Pulsar | |
//////////////////////////////////////////////////////////////////////////////////// | |
def writeToPulsar(df: DataFrame): Unit = { | |
df.write. | |
format("pulsar"). | |
option("service.url", endpointURL). | |
option("admin.url", adminURL). | |
option("topic", topicName). | |
save() | |
} | |
val schema = batch.head.schema | |
val firstBatchDF = createDataFrameFromRows(spark, batch.take(500), schema) | |
val secondBatchDF = createDataFrameFromRows(spark, batch.drop(500), schema) | |
// Dumping first batch | |
writeToPulsar(firstBatchDF) | |
// Dumping second batch | |
writeToPulsar(secondBatchDF) | |
//////////////////////////////////////////////////////////////////////////////////// | |
// Appendix: Reading from Pulsar | |
//////////////////////////////////////////////////////////////////////////////////// | |
val readPulsarDF = spark.read. | |
format("pulsar"). | |
option("service.url", endpointURL). | |
option("admin.url", adminURL). | |
option("topic", topicName). | |
load() | |
//////////////////////////////////////////////////////////////////////////////////// | |
// Appendix: Reading from Hudi | |
//////////////////////////////////////////////////////////////////////////////////// | |
val readHudiDF = spark.read. | |
format("hudi"). | |
load(s"file:///tmp/pulsar/$topicName") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment