Skip to content

Instantly share code, notes, and snippets.

@rjurney rjurney/avro.scala
Last active Aug 29, 2015

Embed
What would you like to do?
Unable to read an Avro in Scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.commons.lang.StringEscapeUtils.escapeCsv
val file = sc.textFile("hdfs://hivecluster2/securityx/beaconing_activity.txt/2014/05/12/14/hour")
val arys = file.map{line => line.split("\t")}
val avroRdd = sc.newAPIHadoopFile("hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/27/19/part-m-00019.avro",
classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]],
classOf[NullWritable])
// spark-shell -usejavacp -classpath "*.jar"
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
import java.net.URI
import java.io.BufferedInputStream
import java.io.File
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.file.DataFileStream
import org.apache.avro.io.DatumReader
import org.apache.avro.file.DataFileReader
val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/27/19/part-m-00019.avro"
val inPath = new Path(input);
val file = new File(input)
val fs = FileSystem.get(URI.create(input), new Configuration());
//val inStream = new BufferedInputStream(fs.open(inPath));
val reader = new SpecificDatumReader
val fileReader = new DataFileReader(file, reader);
scala> val avroRdd = sc.newAPIHadoopFile("hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/27/19/part-m-00019.avro",
| classOf[AvroKeyInputFormat[GenericRecord]],
| classOf[AvroKey[GenericRecord]],
| classOf[NullWritable])
14/05/27 19:26:23 INFO storage.MemoryStore: ensureFreeSpace(167634) called with curMem=0, maxMem=308713881
14/05/27 19:26:23 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 163.7 KB, free 294.3 MB)
avroRdd: org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], org.apache.hadoop.io.NullWritable)] = NewHadoopRDD[0] at newAPIHadoopFile at <console>:31
scala> avroRdd.first()
14/05/27 19:26:30 INFO input.FileInputFormat: Total input paths to process : 1
14/05/27 19:26:30 INFO spark.SparkContext: Starting job: first at <console>:34
14/05/27 19:26:30 INFO scheduler.DAGScheduler: Got job 0 (first at <console>:34) with 1 output partitions (allowLocal=true)
14/05/27 19:26:30 INFO scheduler.DAGScheduler: Final stage: Stage 0 (first at <console>:34)
14/05/27 19:26:30 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/05/27 19:26:30 INFO scheduler.DAGScheduler: Missing parents: List()
14/05/27 19:26:30 INFO scheduler.DAGScheduler: Computing the requested partition locally
14/05/27 19:26:30 INFO rdd.NewHadoopRDD: Input split: hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/27/19/part-m-00019.avro:0+35046663
Exception in thread "Local computation of job 0" java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
at org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyInputFormat.java:47)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:94)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:694)
at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:679)
scala> val reader = new SpecificDatumReader
reader: org.apache.avro.specific.SpecificDatumReader[Nothing] = org.apache.avro.specific.SpecificDatumReader@223477
scala> val fileReader = new DataFileReader(file, reader);
<console>:26: error: type mismatch;
found : org.apache.avro.specific.SpecificDatumReader[Nothing]
required: org.apache.avro.io.DatumReader[D]
Note: Nothing <: D, but Java-defined trait DatumReader is invariant in type D.
You may wish to investigate a wildcard type such as `_ <: D`. (SLS 3.2.10)
val fileReader = new DataFileReader(file, reader);
@friso

This comment has been minimized.

Copy link

friso commented Oct 27, 2014

In case anyone is Googling, this is the issue: https://issues.apache.org/jira/browse/SPARK-3039

Spark pulls in a Avro MapReduce build through the Hive dep, but Avro MapReduce comes in two flavors: hadoop1 and hadoop2. Without specifying, you get the hadoop1 build.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.