Skip to content

Instantly share code, notes, and snippets.

@rjurney rjurney/avro.scala
Created Jun 3, 2014

Embed
What would you like to do?
Loading Avros in Spark Shell...
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapred.AvroInputFormat
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.commons.lang.StringEscapeUtils.escapeCsv
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import java.io.BufferedInputStream
import org.apache.avro.file.DataFileStream
import org.apache.avro.io.DatumReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.mapred.FsInput
import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.FileInputFormat
sc.addJar("avro-1.7.6.jar")
sc.addJar("avro-mapred-1.7.6.jar")
val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/"//part-m-000{15,16}.avro"
// Setup the path for the job vai a Hadoop JobConf
val jobConf= new JobConf(sc.hadoopConfiguration)
jobConf.setJobName("Test Scala Job")
FileInputFormat.setInputPaths(jobConf, input)
val rdd = sc.hadoopRDD(
jobConf,
classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]],
classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],
classOf[org.apache.hadoop.io.NullWritable],
1)
val f1 = rdd.first
java.io.NotSerializableException (java.io.NotSerializableException: org.apache.avro.mapred.AvroWrapper)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:28)
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:48)
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223)
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:45)
java.security.AccessController.doPrivileged(Native Method)
javax.security.auth.Subject.doAs(Subject.java:415)
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:45)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.