Skip to content

Instantly share code, notes, and snippets.

Avatar

Alexey Kudinkin alexeykudinkin

View GitHub Profile
@alexeykudinkin
alexeykudinkin / ingest_p2.scala
Created August 18, 2022 02:39
Ingesting from Apache Pulsar to Apache Hudi
View ingest_p2.scala
import org.apache.spark.sql.HoodieUnsafeUtils.createDataFrameFromRows
import org.apache.spark.sql.{DataFrame, SparkSession, functions}
val spark: SparkSession
val adminURL = "http://localhost:8080"
val endpointURL = "pulsar://localhost:6650"
val topicName = "stonks_test"
@alexeykudinkin
alexeykudinkin / ingest_p1.sh
Created August 18, 2022 02:36
Ingesting from Apache Pulsar to Apache Hudi
View ingest_p1.sh
################################################################################
# Step 1: Start Pulsar in standalone mode (using Docker)
################################################################################
docker run -it -p 6650:6650 -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.10.1 bin/pulsar standalone
################################################################################
# Step 4: Ingest using Hudi's DeltaStreamer utility
################################################################################
@alexeykudinkin
alexeykudinkin / query.scala
Created June 9, 2022 20:39
Querying previously ingested Amazon Reviews dataset
View query.scala
// NOTE: This relies on the dataset previously ingested in here:
// https://gist.github.com/alexeykudinkin/233ce2d365ae4a9833f557de7ed5d1b9
////////////////////////////////////////////////////////////////
// Preparing tables for Querying
////////////////////////////////////////////////////////////////
// Temp Table w/ Data Skipping DISABLED
val readDF: DataFrame = {
@alexeykudinkin
alexeykudinkin / read_column_stats.scala
Created June 9, 2022 20:36
Read Hudi's Column Stats Index
View read_column_stats.scala
////////////////////////////////////////////////////////////////
// Load Metadata Table (for validation)
////////////////////////////////////////////////////////////////
val metadataTablePath = s"$outputPath/.hoodie/metadata"
val targetColStatsIndexColumns = Seq(
HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
@alexeykudinkin
alexeykudinkin / ingest.scala
Created June 9, 2022 20:34
Ingesting Amazon Reviews dataset into a Hudi table on S3
View ingest.scala
import org.apache.hadoop.fs.{FileStatus, Path}
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient