Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Last active August 26, 2022 20:46
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nsivabalan/445c17352046a6cdee51534933549e48 to your computer and use it in GitHub Desktop.
Save nsivabalan/445c17352046a6cdee51534933549e48 to your computer and use it in GitHub Desktop.
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.sql.SaveMode._
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(1))
val inputPath = "/tmp/inputDir/"
val dStream = ssc.textFileStream(inputPath)
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val dataGen = new DataGenerator
dStream.foreachRDD { rdd =>
val batchDf = rdd.toDF()
batchDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "tpep_dropoff_datetime").
option(RECORDKEY_FIELD_OPT_KEY, "tpep_pickup_datetime").
option(PARTITIONPATH_FIELD_OPT_KEY, "VendorID").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
}
ssc.start()
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.sql.SaveMode._
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(1))
val inputPath = "/tmp/inputDir/"
val dStream = ssc.textFileStream(inputPath)
val basePath = "file:///tmp/parquet"
dStream.foreachRDD { rdd =>
val batchDf = rdd.toDF()
batchDf.write.format("parquet").
mode(Append).
save(basePath)
}
ssc.start()
val df = spark.read.format("parquet").load("/tmp/parquet/")
df: org.apache.spark.sql.DataFrame = [value: string]
scala> df.printSchema
root
|-- value: string (nullable = true)
scala> df.show(2, false)
+-----+
|value|
+-----+
+-----+
scala> df.count
res2: Long = 0
@guanziyue
Copy link

guanziyue commented Aug 26, 2022

Hi nsivabalan, sorry that I have no exp on using spark datasource api. I use java rdd api in spark streaming. Here is a simple script I use w/o business logic. From my side, I didn't see anything wrong in your script. I use spark 3.0.1 version.

private transient JavaStreamingContext jsc;

initSparkContext();
// initial kafka data source.
JavaInputDStream<ConsumerRecord<String, byte[]>> combinedInputStream = setUpInput(jsc, topicName, consumerGroupID);

combinedInputStream.foreachRDD(rdd -> {
    updateGlobalSchema();
    // initial hudi write client
    SparkRDDWriteClient<FeatureStorePayload> client = getSparkRDDWriteClient(jsc.sparkContext(),cfg);
    // get kafka offsets so that we can commit it at the end of batch
    OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
    preWrite(client);
    String newCommitTime = client.startCommit();
    // here we prepared data in advance so that we can use several optimization. BUT THIS CODE IS NOT CONCURRENT SAFE!
    // Any concurrent operation to index should be fenced by users
    // TODO: move this logic into hudi rather than left them out!!!!
    JavaRDD<HoodieRecord<FeatureStorePayload>> hudiRecord = preparedRecord(rdd, client);
    // call hudi to write data
    JavaRDD<WriteStatus> result = client.upsertPreppedRecords(hudiRecord, newCommitTime);
    result = result.cache();
    client.getEngineContext().setJobStatus(this.getClass().getSimpleName(), "Check Commit status error");
    if (handleErrors(jsc.sparkContext(), newCommitTime, result) != 0) {
        throw new RuntimeException("Commit result error");
    }
    client.getEngineContext().setJobStatus(this.getClass().getSimpleName(), "Commit write status collect");
    client.commit(newCommitTime, result, Option.empty());
    ((DirectKafkaInputDStream) combinedInputStream.inputDStream()).commitAsync(offsetRanges);
});

jsc.start();
try {
    jsc.awaitTermination();
} catch (InterruptedException e) {
    e.printStackTrace();
} finally {
    if (timelineServer.isPresent()) {
        timelineServer.get().stop();
    }
}

@guanziyue
Copy link

guanziyue commented Aug 26, 2022

Not sure the status of timeline. It seems that inputstream didn't generate proper data. Any log from hudi client side?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment