Created June 22, 2016 17:58
import org.apache.crunch.Source
import org.apache.avro.mapred.AvroKey
object SparkRunner {
def readInput[S <: SpecificRecord: ClassTag](spark: SparkContext, source: Source[S]): RDD[S] = {
val job = Job.getInstance(spark.hadoopConfiguration)
source.configureSource(job, -1)
val input = spark.newAPIHadoopRDD(
classOf[CrunchInputFormat[AvroKey[S], NullWritable]],
classOf[NullWritable]) => x._1.datum()).map(x => SpecificData.get().deepCopy(x.getSchema, x))
