Create a gist now

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import org.apache.crunch.Source
import org.apache.crunch.impl.mr.run.CrunchInputFormat
import org.apache.avro.mapred.AvroKey
import org.apache.hadoop.io.NullWritable
...
object SparkRunner {
def readInput[S <: SpecificRecord: ClassTag](spark: SparkContext, source: Source[S]): RDD[S] = {
val job = Job.getInstance(spark.hadoopConfiguration)
source.configureSource(job, -1)
val input = spark.newAPIHadoopRDD(
job.getConfiguration,
classOf[CrunchInputFormat[AvroKey[S], NullWritable]],
classOf[AvroKey[S]],
classOf[NullWritable])
input.map(x => x._1.datum()).map(x => SpecificData.get().deepCopy(x.getSchema, x))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment