Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Example generic spark setup that uses avro for schemas and the parquet file format for storage
@namespace("com.example.avro.parquet.spark")
protocol HTTP {
record Header {
string name;
string value;
}
record Request {
string method;
string path;
string query;
array<Header> headers;
bytes body;
}
record Response {
int status;
array<Header> headers;
bytes body;
}
}
// sparquet context can read and write parquet files into RDD of different avro schema types
val sc = new SparquetContext("Example")
val requests: RDD[Request] = sc.parquetFile[Request]("/http/requests.parquet")
val responses: RDD[Response] = requests.map(intoResponse)
responses.saveAsParquetFile("/http/responses.parquet")
package com.example.avro.parquet.spark
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import parquet.avro.{AvroParquetOutputFormat, AvroWriteSupport, AvroReadSupport}
import parquet.hadoop.{ParquetOutputFormat, ParquetInputFormat}
import scala.reflect.ClassTag
class SparquetContext(name: String, config: Map[String, String]) {
def this(name: String) = this(name, Map())
var spark: SparkContext = _
val conf = new SparkConf()
.setAppName(name)
.setAll(config)
.set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName)
.set("spark.kryo.registrator", classOf[com.example.avro.parquet.spark.KryoProtocolRegistrator].getName)
spark = new SparkContext(conf)
def parquetFile[T](path: String)(implicit m: ClassTag[T]): RDD[T] = {
val job = Job.getInstance(spark.hadoopConfiguration)
ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[T]])
val file = spark.newAPIHadoopFile(
path,
classOf[ParquetInputFormat[T]],
classOf[Void],
m.runtimeClass.asInstanceOf[Class[T]],
job.getConfiguration)
file.map({ case (void, record) => record })
}
def saveAsParquetFile[T <: IndexedRecord](records: RDD[T], path: String)(implicit m: ClassTag[T]) = {
val keyedRecords: RDD[(Void, T)] = records.map(record => (null, record))
spark.hadoopConfiguration.setBoolean("parquet.enable.summary-metadata", false)
val job = Job.getInstance(spark.hadoopConfiguration)
ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport])
AvroParquetOutputFormat.setSchema(job, m.runtimeClass.newInstance().asInstanceOf[IndexedRecord].getSchema())
keyedRecords.saveAsNewAPIHadoopFile(
path,
classOf[Void],
m.runtimeClass.asInstanceOf[Class[T]],
classOf[ParquetOutputFormat[T]],
job.getConfiguration
)
}
def stop() = spark.stop()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment