Skip to content

Instantly share code, notes, and snippets.

@alexeykudinkin
Created August 18, 2022 02:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexeykudinkin/d0d471685456cf3a1d698675928f2777 to your computer and use it in GitHub Desktop.
Save alexeykudinkin/d0d471685456cf3a1d698675928f2777 to your computer and use it in GitHub Desktop.
Ingesting from Apache Pulsar to Apache Hudi
import org.apache.spark.sql.HoodieUnsafeUtils.createDataFrameFromRows
import org.apache.spark.sql.{DataFrame, SparkSession, functions}
val spark: SparkSession
val adminURL = "http://localhost:8080"
val endpointURL = "pulsar://localhost:6650"
val topicName = "stonks_test"
////////////////////////////////////////////////////////////////////////////////////
// Step 2: Preparing the Dataset
////////////////////////////////////////////////////////////////////////////////////
val df = spark.read.format("json").
load("/Users/alexey.kudinkin/code/github/apache/pulsar/.bin/2.10.1/batch.json")
val batch =
df.withColumn("__topic", functions.lit(topicName)).
limit(1000).
collect()
////////////////////////////////////////////////////////////////////////////////////
// Step 3: Writing into Pulsar
////////////////////////////////////////////////////////////////////////////////////
def writeToPulsar(df: DataFrame): Unit = {
df.write.
format("pulsar").
option("service.url", endpointURL).
option("admin.url", adminURL).
option("topic", topicName).
save()
}
val schema = batch.head.schema
val firstBatchDF = createDataFrameFromRows(spark, batch.take(500), schema)
val secondBatchDF = createDataFrameFromRows(spark, batch.drop(500), schema)
// Dumping first batch
writeToPulsar(firstBatchDF)
// Dumping second batch
writeToPulsar(secondBatchDF)
////////////////////////////////////////////////////////////////////////////////////
// Appendix: Reading from Pulsar
////////////////////////////////////////////////////////////////////////////////////
val readPulsarDF = spark.read.
format("pulsar").
option("service.url", endpointURL).
option("admin.url", adminURL).
option("topic", topicName).
load()
////////////////////////////////////////////////////////////////////////////////////
// Appendix: Reading from Hudi
////////////////////////////////////////////////////////////////////////////////////
val readHudiDF = spark.read.
format("hudi").
load(s"file:///tmp/pulsar/$topicName")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment