Skip to content

Instantly share code, notes, and snippets.

create table table1 (
id int,
dt string,
name string,
price double,
ts long
) using hudi
tblproperties (
primaryKey = 'id',
type = 'cow',
hadoop fs -cp s3://rxusandbox-us-west-2/testcases/stocks/data/schema.avsc /tmp/
hadoop fs -cp s3://rxusandbox-us-west-2/testcases/stocks/data/source /tmp/source_parquet
NOW=$(date '+%Y%m%dt%H%M%S')
bin/spark-submit --master yarn --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
~/hudi-utilities-bundle_2.12-0.13.0.jar --target-base-path /tmp/deltastreamertest/stocks${NOW} \
--target-table stocks${NOW} --table-type COPY_ON_WRITE --base-file-format PARQUET \
--source-class org.apache.hudi.utilities.sources.JsonDFSSource \
--source-ordering-field ts --payload-class org.apache.hudi.common.model.DefaultHoodieRecordPayload \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
~/spark/spark-3.2.3-bin-hadoop3.2/bin/spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
~/spark/code/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.13.0.jar --table-type MERGE_ON_READ \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field ts \
--target-base-path /tmp/stock_ticks_cow \
--target-table stock_ticks_cow --props /var/demo/config/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--continuous \
--min-sync-interval-seconds 5
from pyspark.sql.types import *
from pyspark.sql.functions import *
import time
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import Row
from datetime import date
spark = SparkSession \
.builder \
hadoop fs -cp s3://rxusandbox-us-west-2/testcases/stocks/data/schema.avsc /tmp/
hadoop fs -cp s3://rxusandbox-us-west-2/testcases/stocks/data/source /tmp/source_parquet
NOW=$(date '+%Y%m%dt%H%M%S')
bin/spark-submit --master local --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--jars /home/hadoop/v_3.1/hudi-hive-sync-bundle-0.13.0.jar,/home/hadoop/v_3.1/hudi-spark3.1-bundle_2.12-0.13.0.jar \
/home/hadoop/v_3.1/hudi-utilities-slim-bundle_2.12-0.13.0.jar \
--target-base-path /tmp/deltastreamertest/stocks${NOW} \
--target-table stocks${NOW} --table-type COPY_ON_WRITE --base-file-format PARQUET \
--source-class org.apache.hudi.utilities.sources.JsonDFSSource \
hadoop fs -cp s3://rxusandbox-us-west-2/testcases/stocks/data/schema.avsc /tmp/
hadoop fs -cp s3://rxusandbox-us-west-2/testcases/stocks/data/source /tmp/source_parquet
NOW=$(date '+%Y%m%dt%H%M%S')
bin/spark-submit --master local --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--jars /home/hadoop//v_3.3/hudi-hive-sync-bundle-0.13.0.jar,/home/hadoop/v_3.3/hudi-spark3.3-bundle_2.12-0.13.0.jar \
/home/hadoop/v_3.3/hudi-utilities-slim-bundle_2.12-0.13.0.jar \
--target-base-path /tmp/deltastreamertest/stocks${NOW} \
--target-table stocks${NOW} --table-type COPY_ON_WRITE --base-file-format PARQUET \
--source-class org.apache.hudi.utilities.sources.JsonDFSSource \
hadoop fs -cp s3://rxusandbox-us-west-2/testcases/stocks/data/schema.avsc /tmp/
hadoop fs -cp s3://rxusandbox-us-west-2/testcases/stocks/data/source /tmp/source_parquet
NOW=$(date '+%Y%m%dt%H%M%S')
bin/spark-submit --master local --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--jars /home/hadoop/v_3.2/hudi-hive-sync-bundle-0.13.0.jar,/home/hadoop/v_3.2/hudi-spark3.2-bundle_2.12-0.13.0.jar \
/home/hadoop/v_3.2/hudi-utilities-slim-bundle_2.12-0.13.0.jar \
--target-base-path /tmp/deltastreamertest/stocks${NOW} \
--target-table stocks${NOW} --table-type COPY_ON_WRITE --base-file-format PARQUET \
--source-class org.apache.hudi.utilities.sources.JsonDFSSource \
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.keygen.constant._
~/spark/spark-3.2.3-bin-hadoop3.2/bin/spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
~/spark/code/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.13.0.jar --table-type MERGE_ON_READ \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field ts \
--target-base-path /tmp/issue_5916 \
--target-table stock_ticks_cow --props /var/demo/config/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--continuous \
--min-sync-interval-seconds 5
======GLUE SCRIPT=========
import sys
import os
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
from pyspark.sql.functions import *