Skip to content

Instantly share code, notes, and snippets.

@MLnick
Created June 21, 2014 05:40
Show Gist options
  • Save MLnick/5864741781b9340cb211 to your computer and use it in GitHub Desktop.
Save MLnick/5864741781b9340cb211 to your computer and use it in GitHub Desktop.
package example
import org.apache.avro.Schema.Parser
import java.io.{DataInput, DataOutput, File}
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.generic.{GenericRecord, GenericDatumWriter}
import org.apache.avro.file.DataFileWriter
import org.apache.spark.SparkContext
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.avro.mapred.AvroKey
import org.apache.hadoop.io.{Writable, NullWritable}
import org.apache.spark.api.python.Converter
object WriteAvro extends App {
/*
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
*/
val path = "/tmp/avro/schema/user.avsc"
val schema = new Parser().parse(new File(path))
val user1 = new Record(schema)
user1.put("name", "Alyssa")
user1.put("favorite_number", 256)
val user2 = new Record(schema)
user2.put("name", "Ben")
user2.put("favorite_number", 7)
user2.put("favorite_color", "red")
val file = new File("/tmp/avro/users/users.avro")
val datumWriter = new GenericDatumWriter[GenericRecord](schema)
val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
dataFileWriter.create(schema, file)
dataFileWriter.append(user1)
dataFileWriter.append(user2)
dataFileWriter.close()
}
class AvroConverter extends Converter[AvroKey[GenericRecord], java.util.Map[String, Any]] {
import collection.JavaConversions._
override def convert(obj: AvroKey[GenericRecord]): java.util.Map[String, Any] = {
val record = obj.datum()
val schema = record.getSchema
mapAsJavaMap(schema.getFields.map { f => (f.name, record.get(f.name)) }.toMap)
}
}
object ReadAvro extends App {
val sc = new SparkContext("local[2]", "TestAvro")
val path = "/tmp/avro/users/users.avro"
val avroRDD = sc.newAPIHadoopFile(path,
classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]],
classOf[NullWritable])
val userRDD = avroRDD.map{ case (k, v) => new AvroConverter().convert(k) }
println(userRDD.collect().mkString("\n"))
}
Nicks-MacBook-Pro:incubator-spark-mlnick Nick$ SPARK_CLASSPATH=/Users/Nick/workspace/scala/avro-example/target/avro-0.1.jar
IPYTHON=1 ./bin/pyspark
Python 2.7.6 |Anaconda 1.8.0 (x86_64)| (default, Jan 10 2014, 11:23:15)
Type "copyright", "credits" or "license" for more information.
IPython 2.0.0 -- An enhanced Interactive Python.
? -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help -> Python's own help system.
object? -> Details about 'object', use 'object??' for extra details.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.0.0-SNAPSHOT
/_/
Using Python version 2.7.6 (default, Jan 10 2014 11:23:15)
SparkContext available as sc.
In [1]: path = "/tmp/avro/users/users.avro"
In [2]: avro = sc.newAPIHadoopFile(path, "org.apache.avro.mapreduce.AvroKeyInputFormat", "org.apache.avro.mapred.AvroKey", "
org.apache.hadoop.io.NullWritable", "example.AvroConverter")
In [3]: avro.collect()
Out[3]:
[(u'{name=Alyssa, favorite_number=256, favorite_color=null}', None),
(u'{name=Ben, favorite_number=7, favorite_color=red}', None)]
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>example</groupId>
<artifactId>avro</artifactId>
<version>0.1</version>
<inceptionYear>2014</inceptionYear>
<properties>
<scala.version>2.10.4</scala.version>
</properties>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.1.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.6</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.7.6</version>
</dependency>
<!-- TEST -->
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.10</artifactId>
<version>2.1.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<!-- disable surefire -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<!-- enable scalatest -->
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0-RC1</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<!-- <args>
<arg>-target:jvm-1.7</arg>
</args> -->
<jvmArgs>
<jvmArg>-Xmx3g</jvmArg>
</jvmArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
</additionalProjectnatures>
<classpathContainers>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.1</version>
<configuration>
<!-- put your configurations here -->
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.1</version>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment