Skip to content

Instantly share code, notes, and snippets.

@yanyang82
Created June 22, 2016 17:58
Show Gist options
  • Save yanyang82/7f534285f7d0bb6a22e5f2a420fc90d8 to your computer and use it in GitHub Desktop.
Save yanyang82/7f534285f7d0bb6a22e5f2a420fc90d8 to your computer and use it in GitHub Desktop.
import org.apache.crunch.Source
import org.apache.crunch.impl.mr.run.CrunchInputFormat
import org.apache.avro.mapred.AvroKey
import org.apache.hadoop.io.NullWritable
...
object SparkRunner {
def readInput[S <: SpecificRecord: ClassTag](spark: SparkContext, source: Source[S]): RDD[S] = {
val job = Job.getInstance(spark.hadoopConfiguration)
source.configureSource(job, -1)
val input = spark.newAPIHadoopRDD(
job.getConfiguration,
classOf[CrunchInputFormat[AvroKey[S], NullWritable]],
classOf[AvroKey[S]],
classOf[NullWritable])
input.map(x => x._1.datum()).map(x => SpecificData.get().deepCopy(x.getSchema, x))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment