Skip to content

Instantly share code, notes, and snippets.

@rjurney
Last active August 29, 2015 14:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rjurney/24a589f64addebca026f to your computer and use it in GitHub Desktop.
Save rjurney/24a589f64addebca026f to your computer and use it in GitHub Desktop.
Unable to read an Avro in Scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.commons.lang.StringEscapeUtils.escapeCsv
val file = sc.textFile("hdfs://hivecluster2/securityx/beaconing_activity.txt/2014/05/12/14/hour")
val arys = file.map{line => line.split("\t")}
val avroRdd = sc.newAPIHadoopFile("hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/27/19/part-m-00019.avro",
classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]],
classOf[NullWritable])
// spark-shell -usejavacp -classpath "*.jar"
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
import java.net.URI
import java.io.BufferedInputStream
import java.io.File
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.file.DataFileStream
import org.apache.avro.io.DatumReader
import org.apache.avro.file.DataFileReader
val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/27/19/part-m-00019.avro"
val inPath = new Path(input);
val file = new File(input)
val fs = FileSystem.get(URI.create(input), new Configuration());
//val inStream = new BufferedInputStream(fs.open(inPath));
val reader = new SpecificDatumReader
val fileReader = new DataFileReader(file, reader);
scala> val avroRdd = sc.newAPIHadoopFile("hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/27/19/part-m-00019.avro",
| classOf[AvroKeyInputFormat[GenericRecord]],
| classOf[AvroKey[GenericRecord]],
| classOf[NullWritable])
14/05/27 19:26:23 INFO storage.MemoryStore: ensureFreeSpace(167634) called with curMem=0, maxMem=308713881
14/05/27 19:26:23 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 163.7 KB, free 294.3 MB)
avroRdd: org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], org.apache.hadoop.io.NullWritable)] = NewHadoopRDD[0] at newAPIHadoopFile at <console>:31
scala> avroRdd.first()
14/05/27 19:26:30 INFO input.FileInputFormat: Total input paths to process : 1
14/05/27 19:26:30 INFO spark.SparkContext: Starting job: first at <console>:34
14/05/27 19:26:30 INFO scheduler.DAGScheduler: Got job 0 (first at <console>:34) with 1 output partitions (allowLocal=true)
14/05/27 19:26:30 INFO scheduler.DAGScheduler: Final stage: Stage 0 (first at <console>:34)
14/05/27 19:26:30 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/05/27 19:26:30 INFO scheduler.DAGScheduler: Missing parents: List()
14/05/27 19:26:30 INFO scheduler.DAGScheduler: Computing the requested partition locally
14/05/27 19:26:30 INFO rdd.NewHadoopRDD: Input split: hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/27/19/part-m-00019.avro:0+35046663
Exception in thread "Local computation of job 0" java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
at org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyInputFormat.java:47)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:94)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:694)
at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:679)
scala> val reader = new SpecificDatumReader
reader: org.apache.avro.specific.SpecificDatumReader[Nothing] = org.apache.avro.specific.SpecificDatumReader@223477
scala> val fileReader = new DataFileReader(file, reader);
<console>:26: error: type mismatch;
found : org.apache.avro.specific.SpecificDatumReader[Nothing]
required: org.apache.avro.io.DatumReader[D]
Note: Nothing <: D, but Java-defined trait DatumReader is invariant in type D.
You may wish to investigate a wildcard type such as `_ <: D`. (SLS 3.2.10)
val fileReader = new DataFileReader(file, reader);
@friso
Copy link

friso commented Oct 27, 2014

In case anyone is Googling, this is the issue: https://issues.apache.org/jira/browse/SPARK-3039

Spark pulls in a Avro MapReduce build through the Hive dep, but Avro MapReduce comes in two flavors: hadoop1 and hadoop2. Without specifying, you get the hadoop1 build.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment