Skip to content

Instantly share code, notes, and snippets.

@Igosuki
Created August 31, 2021 16:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Igosuki/324b011f40185269d3fc552350d21744 to your computer and use it in GitHub Desktop.
Save Igosuki/324b011f40185269d3fc552350d21744 to your computer and use it in GitHub Desktop.
Parquet to Avro spark conversion
#!/bin/sh
BASEDIR=`dirname $0`
$BASEDIR/run_script $BASEDIR/parquet_to_avro.scala -c spark.driver.globs=$1 -c spark.driver.out=$2 -c spark.sql.sources.partitionOverwriteMode=dynamic
log4j.rootCategory=info,rolling
log4j.appender.rolling=org.apache.log4j.RollingFileAppender
log4j.appender.rolling.layout=org.apache.log4j.PatternLayout
log4j.appender.rolling.layout.conversionPattern=[%d] %p %m (%c)%n
log4j.appender.rolling.maxFileSize=100MB
log4j.appender.rolling.maxBackupIndex=50
log4j.appender.rolling.file=${spark.yarn.app.container.log.dir}/${vm.logging.name}-driver.log
log4j.appender.rolling.file=/var/log/spark/spark.log
log4j.appender.rolling.encoding=UTF-8
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.conversionPattern=%d [%t] %-5p %c - %m%n
import com.github.mrpowers.spark.daria.sql.DariaWriters
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.commons.io.FilenameUtils
val fileGlobs = sc.getConf.get("spark.driver.globs")
val dest = sc.getConf.get("spark.driver.out")
val fs = FileSystem.get(new Configuration(true));
val status = fs.globStatus(new Path(fileGlobs))
for (fileStatus <- status) {
val path = fileStatus.getPath().toString()
try {
val dfin = spark.read.format("parquet").load(path)
val fileName = fileStatus.getPath().getName();
val fileNameWithOutExt = FilenameUtils.removeExtension(fileName);
val destination = s"${dest}/${fileNameWithOutExt}.avro"
println(s"Converting $path to avro at $destination")
DariaWriters.writeSingleFile(
df = dfin,
format = "avro",
sc = spark.sparkContext,
tmpFolder = s"/tmp/dw/${fileName}",
filename = destination
)
} catch {
case e: Throwable => println(s"failed to convert $path : ${e.getMessage}")
}
}
#!/bin/sh
BASEDIR=`dirname $0`
LOG_PROPS="$BASEDIR/../log4j.properties"
spark-shell --driver-memory=8g --files $LOG_PROPS --conf "spark.executor.extraJavaOptions='-Dlog4j.configuration=log4j.properties'" --driver-java-options "-Dlog4j.configuration=file:$LOG_PROPS" --conf spark.sql.session.timeZone=UTC --packages com.github.mrpowers:spark-daria_2.12:1.0.0,org.apache.spark:spark-avro_2.12:3.1.2 -i "$@"
./etl/bin/parquet_to_avro "$PARQUET_TEST_DATA/data/*.parquet" "$ARROW_TEST_DATA/data/avro"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment